Re: Starting a job that does not use checkpointing from a savepoint is broken ?

2018-01-18 Thread jelmer
jelmer 
07:49 (0 minutes ago)
to Eron
Hey Eron,

Thanks, you stated the issue better and more compact than I could

I will not debate the wisdom of not using checkpoints but when migrating
jobs you may not  be aware if a job has checkpointing enabled, if you are
not the author, and if you follow the upgrade guide to the letter you end
up seriously breaking this job.

Somewhere something is wrong, be it in the documentation or implementation



On 19 January 2018 at 02:05, Eron Wright  wrote:

> To restate the issue:
> When checkpointing is disabled, the Flink Kafka Consumer relies on the
> periodic offsets that are committed to the broker by the internal Kafka
> client.  Such a job would, upon restart, continue from the committed
> offsets.   However, in the situation that the job is restored from a
> savepoint, then the offsets within the savepoint supercede the broker-based
> offsets.
>
> It seems a bit unusual to use the savepoint feature on a job that doesn't
> have checkpointing enabled.  Makes me wonder whether
> `StreamExecutionEnvironment::enableCheckpointing`, is best understood as
> enabling +periodic+ checkpointing.
>
> The docs say that the periodic offset commit feature is not intended for
> fault tolerance, implying to me that you should use Flink's checkpointing
> feature.  A great reason to use Flink checkpointing is to capture the
> intermediate state of the job, such as window state, in addition to the
> consumer offsets.
>
> I hope this helps,
> Eron
>
>
>
>
>
> On Thu, Jan 18, 2018 at 3:26 PM, jelmer  wrote:
>
>> I ran into a rather annoying issue today while upgrading a  flink jobs
>> from flink 1.3.2 to 1.4.0
>>
>> This particular job does not use checkpointing not state.
>>
>> I followed the instructions at https://ci.apache.org/projects
>> /flink/flink-docs-release-1.4/ops/upgrading.html
>>
>> First created a savepoint, upgraded the cluster, then restarted the job
>> from the savepoint.
>>
>> This all went well until later a few hours later one of our kafka nodes
>> dies.This triggered an exception in the job which was subsequently
>> restarted.
>>
>> However instead of picking up where it left off based on the offsets
>> comitted to kafka (which is what should happen according to
>> https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.4/dev/connectors/kafka.html)  the kafka offsets where reset to the
>> point when i made the savepoint 3 hours earlier and so it started
>> reprocessing millions of messages.
>>
>> Needless to say that creating a savepoint for a job without state or
>> checkpoints does not make that much sense. But I would not expect a restart
>> from a savepoint to completely break a job in the case of failure.
>>
>> I created a repository that reproduces the scenario I encountered
>>
>> https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing
>>
>> Am I misunderstanding anything or should i file a bug for this ?
>>
>>
>>
>


Re: Starting a job that does not use checkpointing from a savepoint is broken ?

2018-01-18 Thread Eron Wright
To restate the issue:
When checkpointing is disabled, the Flink Kafka Consumer relies on the
periodic offsets that are committed to the broker by the internal Kafka
client.  Such a job would, upon restart, continue from the committed
offsets.   However, in the situation that the job is restored from a
savepoint, then the offsets within the savepoint supercede the broker-based
offsets.

It seems a bit unusual to use the savepoint feature on a job that doesn't
have checkpointing enabled.  Makes me wonder whether
`StreamExecutionEnvironment::enableCheckpointing`, is best understood as
enabling +periodic+ checkpointing.

The docs say that the periodic offset commit feature is not intended for
fault tolerance, implying to me that you should use Flink's checkpointing
feature.  A great reason to use Flink checkpointing is to capture the
intermediate state of the job, such as window state, in addition to the
consumer offsets.

I hope this helps,
Eron





On Thu, Jan 18, 2018 at 3:26 PM, jelmer  wrote:

> I ran into a rather annoying issue today while upgrading a  flink jobs
> from flink 1.3.2 to 1.4.0
>
> This particular job does not use checkpointing not state.
>
> I followed the instructions at https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/ops/upgrading.html
>
> First created a savepoint, upgraded the cluster, then restarted the job
> from the savepoint.
>
> This all went well until later a few hours later one of our kafka nodes
> dies.This triggered an exception in the job which was subsequently
> restarted.
>
> However instead of picking up where it left off based on the offsets
> comitted to kafka (which is what should happen according to
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html)  the kafka offsets where reset to
> the point when i made the savepoint 3 hours earlier and so it started
> reprocessing millions of messages.
>
> Needless to say that creating a savepoint for a job without state or
> checkpoints does not make that much sense. But I would not expect a restart
> from a savepoint to completely break a job in the case of failure.
>
> I created a repository that reproduces the scenario I encountered
>
> https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing
>
> Am I misunderstanding anything or should i file a bug for this ?
>
>
>


Starting a job that does not use checkpointing from a savepoint is broken ?

2018-01-18 Thread jelmer
I ran into a rather annoying issue today while upgrading a  flink jobs from
flink 1.3.2 to 1.4.0

This particular job does not use checkpointing not state.

I followed the instructions at
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html

First created a savepoint, upgraded the cluster, then restarted the job
from the savepoint.

This all went well until later a few hours later one of our kafka nodes
dies.This triggered an exception in the job which was subsequently
restarted.

However instead of picking up where it left off based on the offsets
comitted to kafka (which is what should happen according to
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html)
the kafka offsets where reset to the point when i made the savepoint 3
hours earlier and so it started reprocessing millions of messages.

Needless to say that creating a savepoint for a job without state or
checkpoints does not make that much sense. But I would not expect a restart
from a savepoint to completely break a job in the case of failure.

I created a repository that reproduces the scenario I encountered

https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing

Am I misunderstanding anything or should i file a bug for this ?


Re: Failing to recover once checkpoint fails

2018-01-18 Thread Vishal Santoshi
Or this one

https://issues.apache.org/jira/browse/FLINK-4815

On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi 
wrote:

> ping.
>
> This happened again on production and it seems reasonable to abort
> when a checkpoint is not found rather than behave as if it is a brand new
> pipeline.
>
> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Folks sorry for being late on this. Can some body with the knowledge of
>> this code base create a jira issue for the above ? We have seen this more
>> than once on production.
>>
>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> Some relevant Jira issues for you are:
>>>
>>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
>>> failed checkpoints
>>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic fallback
>>> to earlier checkpoint when checkpoint restore fails
>>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always
>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 9. Oct 2017, at 09:06, Fabian Hueske  wrote:
>>>
>>> Hi Vishal,
>>>
>>> it would be great if you could create a JIRA ticket with Blocker
>>> priority.
>>> Please add all relevant information of your detailed analysis, add a
>>> link to this email thread (see [1] for the web archive of the mailing
>>> list), and post the id of the JIRA issue here.
>>>
>>> Thanks for looking into this!
>>>
>>> Best regards,
>>> Fabian
>>>
>>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>>
>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi :
>>>
 Thank you for confirming.


  I think this is a critical bug. In essence any checkpoint store (
 hdfs/S3/File)  will loose state if it is unavailable at resume. This
 becomes all the more painful with your confirming that  "failed
 checkpoints killing the job"  b'coz essentially it mean that if remote
 store in unavailable  during checkpoint than you have lost state ( till of
 course you have a retry of none or an unbounded retry delay, a delay that
 you *hope* the store revives in ) .. Remember  the first retry failure
  will cause new state according the code as written iff the remote store is
 down. We would rather have a configurable property that establishes  our
 desire to abort something like a "abort_retry_on_chkretrevalfailure"


 In our case it is very important that we do not undercount a window,
 one reason we use flink and it's awesome failure guarantees, as various
 alarms sound ( we do anomaly detection on the time series ).

 Please create a jira ticket for us to follow or we could do it.


 PS Not aborting on checkpointing, till a configurable limit is very
 important too.


 On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek 
 wrote:

> Hi Vishal,
>
> I think you're right! And thanks for looking into this so deeply.
>
> With your last mail your basically saying, that the checkpoint could
> not be restored because your HDFS was temporarily down. If Flink had not
> deleted that checkpoint it might have been possible to restore it at a
> later point, right?
>
> Regarding failed checkpoints killing the job: yes, this is currently
> the expected behaviour but there are plans to change this.
>
> Best,
> Aljoscha
>
> On 5. Oct 2017, at 17:40, Vishal Santoshi 
> wrote:
>
> I think this is the offending piece. There is a catch all Exception,
> which IMHO should understand a recoverable exception from an unrecoverable
> on.
>
>
> try {
> completedCheckpoint = retrieveCompletedCheckpoint(ch
> eckpointStateHandle);
> if (completedCheckpoint != null) {
> completedCheckpoints.add(completedCheckpoint);
> }
> } catch (Exception e) {
> LOG.warn("Could not retrieve checkpoint. Removing it from the
> completed " +
> "checkpoint store.", e);
> // remove the checkpoint with broken state handle
> removeBrokenStateHandle(checkpointStateHandle.f1,
> checkpointStateHandle.f0);
> }
>
> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> So this is the issue and tell us that it is wrong. ZK had some state
>> ( backed by hdfs ) that referred to a checkpoint ( the same exact last
>> successful checkpoint that was successful before NN screwed us ). When 
>> the
>> JM tried to recreate the state and b'coz NN was down failed to retrieve 
>> the
>> CHK handle from hdfs and conveniently ( and I think very wrongly ) 
>> removed
>> the CHK from being considered and cleaned the pointer ( though failed as
>> was NN was down and is obvious from the dangling file in recovery ) . The
>> metadata itself was on hdfs and failure in retrieving should ha

Re: Failing to recover once checkpoint fails

2018-01-18 Thread Vishal Santoshi
ping.

This happened again on production and it seems reasonable to abort when
a checkpoint is not found rather than behave as if it is a brand new
pipeline.

On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi 
wrote:

> Folks sorry for being late on this. Can some body with the knowledge of
> this code base create a jira issue for the above ? We have seen this more
> than once on production.
>
> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek 
> wrote:
>
>> Hi Vishal,
>>
>> Some relevant Jira issues for you are:
>>
>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
>> failed checkpoints
>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic fallback
>> to earlier checkpoint when checkpoint restore fails
>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always remove
>> checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>
>> Best,
>> Aljoscha
>>
>>
>> On 9. Oct 2017, at 09:06, Fabian Hueske  wrote:
>>
>> Hi Vishal,
>>
>> it would be great if you could create a JIRA ticket with Blocker priority.
>> Please add all relevant information of your detailed analysis, add a link
>> to this email thread (see [1] for the web archive of the mailing list), and
>> post the id of the JIRA issue here.
>>
>> Thanks for looking into this!
>>
>> Best regards,
>> Fabian
>>
>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>
>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi :
>>
>>> Thank you for confirming.
>>>
>>>
>>>  I think this is a critical bug. In essence any checkpoint store (
>>> hdfs/S3/File)  will loose state if it is unavailable at resume. This
>>> becomes all the more painful with your confirming that  "failed
>>> checkpoints killing the job"  b'coz essentially it mean that if remote
>>> store in unavailable  during checkpoint than you have lost state ( till of
>>> course you have a retry of none or an unbounded retry delay, a delay that
>>> you *hope* the store revives in ) .. Remember  the first retry failure
>>>  will cause new state according the code as written iff the remote store is
>>> down. We would rather have a configurable property that establishes  our
>>> desire to abort something like a "abort_retry_on_chkretrevalfailure"
>>>
>>>
>>> In our case it is very important that we do not undercount a window, one
>>> reason we use flink and it's awesome failure guarantees, as various alarms
>>> sound ( we do anomaly detection on the time series ).
>>>
>>> Please create a jira ticket for us to follow or we could do it.
>>>
>>>
>>> PS Not aborting on checkpointing, till a configurable limit is very
>>> important too.
>>>
>>>
>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi Vishal,

 I think you're right! And thanks for looking into this so deeply.

 With your last mail your basically saying, that the checkpoint could
 not be restored because your HDFS was temporarily down. If Flink had not
 deleted that checkpoint it might have been possible to restore it at a
 later point, right?

 Regarding failed checkpoints killing the job: yes, this is currently
 the expected behaviour but there are plans to change this.

 Best,
 Aljoscha

 On 5. Oct 2017, at 17:40, Vishal Santoshi 
 wrote:

 I think this is the offending piece. There is a catch all Exception,
 which IMHO should understand a recoverable exception from an unrecoverable
 on.


 try {
 completedCheckpoint = retrieveCompletedCheckpoint(ch
 eckpointStateHandle);
 if (completedCheckpoint != null) {
 completedCheckpoints.add(completedCheckpoint);
 }
 } catch (Exception e) {
 LOG.warn("Could not retrieve checkpoint. Removing it from the
 completed " +
 "checkpoint store.", e);
 // remove the checkpoint with broken state handle
 removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle
 .f0);
 }

 On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> So this is the issue and tell us that it is wrong. ZK had some state (
> backed by hdfs ) that referred to a checkpoint ( the same exact last
> successful checkpoint that was successful before NN screwed us ). When the
> JM tried to recreate the state and b'coz NN was down failed to retrieve 
> the
> CHK handle from hdfs and conveniently ( and I think very wrongly ) removed
> the CHK from being considered and cleaned the pointer ( though failed as
> was NN was down and is obvious from the dangling file in recovery ) . The
> metadata itself was on hdfs and failure in retrieving should have been a
> stop all, not going to trying doing magic exception rather than starting
> from a blank state.
>
> org.apache.flink.util.FlinkException: Could not retrieve checkpoint
> 44286 from state handle under /0044286. This indicates that 
> the
> retrieved st

NoClassDefFoundError of a Avro class after cancel then resubmit the same job

2018-01-18 Thread xiatao123
Not sure why, when I submit the job at the first time after a cluster launch,
it is working fine.
After I cancelled the first job, then resubmit the same job again, it will
hit the NoClassDefFoundError.
Very weird, feels like some clean up of a cancelled job messed up future job
of the same classes.
Anyone got the same issue?

java.lang.NoClassDefFoundError: com.xxx.yyy.zzz$Builder
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethods(Class.java:1975)
at
org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods(TypeExtractionUtils.java:243)
at
org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1949)
at
org.apache.flink.api.java.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:55)
at
org.apache.flink.api.java.typeutils.AvroTypeInfo.(AvroTypeInfo.java:48)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1810)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1716)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:953)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:814)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:768)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:764)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createSubclassSerializer(PojoSerializer.java:1129)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.getSubclassSerializer(PojoSerializer.java:1122)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:253)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:309)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:408)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:486)
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:263)
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:209)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.xxx.yyy.zzz$Builder
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 31 more




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flowable <-> flink integration

2018-01-18 Thread Martin Grofčík
Hi Maciek,

Thanks a lot for your answer. The first step which I did was that I am able
execute flink job through flink REST API. Until flink job runs flowable
process instance checks the status of the process through the flink rest
API. Process instance continues further when the process ins finished.
I am working now on the example in which flink job at the end sends a
signal to the process instance (trough flowable REST API) and process
instance will continue in execution (I do not expect any issue there). In
that case flink job needs some additional code to send signal to flowable.

What I want to achieve is that there will be something like jobWrapper,
which will at the end of wrapped flink job execution send a signal to
flowable and process instance will continue. So original flink job does not
need any modification and can be easily integrated into process execution.


FlinkJobWrapper
__
|OriginalFlinkJob
|  
|  |
|  |
|  |
|  L___
|  Send signal
L_


Question is How to do that?

And that's all. (simulations are the next step)

Thank you in advance for your answer.
Regards
Martin

On 18 January 2018 at 19:30, Maciek Próchniak  wrote:

> Hi Martin,
>
>
> I did some activiti development so your mail caught my attention :)
>
> I don't think I understand what are you trying to achieve - where is
> process you're simulating, where is simulation running and where is place
> for Flink. Do you want to invoke Flink (batch job I suppose?) from Flowable
> process? Or do you want to run simulations of BPMN process as Flink job?
>
> thanks,
>
> maciek
>
>
> On 16/01/2018 22:29, Martin Grofčík wrote:
>
> Hi,
>
> I want to implement flowable (BPMN platform  - www.flowable.org) <->
> flink integration module. The motivation is to execute process simulations
> with flink (simple simulation experiment example
> https://gromar01.wordpress.com/2017/11/07/will-we-meet-our-kpis/). I was
> able to create
>
>
> Flink provides REST API through which I can easily create a job and
> monitor its execution.
> wordCountProcess.PNG
> (15K)
>
> 
>
>
> at the end I can encapsulate whole process into one task (e.g. Execute
> flink job) which will do the same in java code.
> In fact I have no experience with flink that's why I can imagine only
> process steps to:
> 1. create flink job
> 2. monitor its state
>
> Question 1:
> Can you propose another useful process steps? (e.g.  to download results,
> upload datasets, .)
> (Provide me a link how I can proceed with their implementation, please)
>
> Question 2:
> The problem with the process is that it is always checking job state. I
> would prefer to add a hook at the end of flink job execution to call
> flowable rest API to notify process instance about the job finished
> (failed) events.
> The way which I have found is to implement rest end point org.apache.flink.
> runtime.webmonitor.handlers.JarPlanHandler which calls flowable rest api
> at the end of flink job execution.
> What I would prefer is to make something like wrapper around the main
> class to execute flowable rest call at the end.
> Can you provide me a hint how to implement this wrapper please?
>
> Thank you in advance for the answer.
>
> Regards
> Martin
>
>
>


Re: flowable <-> flink integration

2018-01-18 Thread Maciek Próchniak

Hi Martin,


I did some activiti development so your mail caught my attention :)

I don't think I understand what are you trying to achieve - where is 
process you're simulating, where is simulation running and where is 
place for Flink. Do you want to invoke Flink (batch job I suppose?) from 
Flowable process? Or do you want to run simulations of BPMN process as 
Flink job?



thanks,

maciek


On 16/01/2018 22:29, Martin Grofčík wrote:

Hi,

I want to implement flowable (BPMN platform  - www.flowable.org 
) <-> flink integration module. The 
motivation is to execute process simulations with flink (simple 
simulation experiment example 
https://gromar01.wordpress.com/2017/11/07/will-we-meet-our-kpis/). I 
was able to create



Flink provides REST API through which I can easily create a job and 
monitor its execution.

wordCountProcess.PNG
(15K)



at the end I can encapsulate whole process into one task (e.g. Execute 
flink job) which will do the same in java code.
In fact I have no experience with flink that's why I can imagine only 
process steps to:

1. create flink job
2. monitor its state

Question 1:
Can you propose another useful process steps? (e.g.  to download 
results, upload datasets, .)

(Provide me a link how I can proceed with their implementation, please)

Question 2:
The problem with the process is that it is always checking job state. 
I would prefer to add a hook at the end of flink job execution to call 
flowable rest API to notify process instance about the job finished 
(failed) events.
The way which I have found is to implement rest end 
point org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler 
which calls flowable rest api at the end of flink job execution.
What I would prefer is to make something like wrapper around the main 
class to execute flowable rest call at the end.

Can you provide me a hint how to implement this wrapper please?

Thank you in advance for the answer.

Regards
Martin




Re: Submitting jobs via Java code

2018-01-18 Thread Luigi Sgaglione
Solved.

this is the corret code to deploy a Job programmatically via REST API.

Thanks

URL serverUrl = new URL("http://192.168.149.130:8081/jars/upload";);
HttpURLConnection urlConnection = (HttpURLConnection)
serverUrl.openConnection();

String boundaryString = "--Boundary";
String crlf = "\r\n";
String fileUrl = "Test-1.jar";
File jarToUpload = new File(fileUrl);

urlConnection.setUseCaches(false);
urlConnection.setDoOutput(true);
urlConnection.setDoInput(true);

urlConnection.setRequestMethod("POST");
urlConnection.setRequestProperty("Connection", "Keep-Alive");
urlConnection.setRequestProperty("Cache-Control", "no-cache");
urlConnection.addRequestProperty("Content-Type", "multipart/form-data;
boundary=" + boundaryString);

OutputStream outputStreamToRequestBody = urlConnection.getOutputStream();
BufferedWriter httpRequestBodyWriter =
new BufferedWriter(new OutputStreamWriter(outputStreamToRequestBody));

// Include the section to describe the file

String payloadString = "--"+boundaryString + crlf + "Content-Disposition:
form-data;"
+ " name=\"jarfile\";"
+ " filename=\"Test-1.jar\""+crlf
+ "Content-Type: application/x-java-archive"+crlf+crlf;
System.out.println(payloadString);
httpRequestBodyWriter.write(payloadString);
httpRequestBodyWriter.flush();

// Write the actual file contents
FileInputStream inputStream = new FileInputStream(jarToUpload);

int bytesRead;
byte[] dataBuffer = new byte[1024];
while((bytesRead = inputStream.read(dataBuffer)) != -1) {
outputStreamToRequestBody.write(dataBuffer, 0, bytesRead);
}

outputStreamToRequestBody.flush();

httpRequestBodyWriter.write(crlf+"--"+boundaryString+"--" +crlf);
httpRequestBodyWriter.flush();

inputStream.close();
outputStreamToRequestBody.close();
httpRequestBodyWriter.close();
BufferedReader httpResponseReader =
new BufferedReader(new
InputStreamReader(urlConnection.getInputStream()));
String lineRead;
while((lineRead = httpResponseReader.readLine()) != null) {
System.out.println(lineRead);
}

}

Best Regards
Luigi

2018-01-18 17:02 GMT+01:00 Luigi Sgaglione :

> Hi Timo,
>
> I think that the REST API is the most suitable solution. Thanks.
>
> So, I'm trying to use the Flink REST API and I'm able to perform get
> request but not the post one.
>
> In particular when I issue a post to upload the jar I receive this error
> form the server: {"error": "Failed to upload the file."}
>
> this is the used code:
>
>
>  URL serverUrl = new URL("http://192.168.149.130:8081/jars/upload";);
>
> HttpURLConnection urlConnection = (HttpURLConnection)
> serverUrl.openConnection();
>
> String boundaryString = "--Boundary";
> String crlf = "\r\n";
> String fileUrl = "Test-1.jar";
> File jarToUpload = new File(fileUrl);
>
> urlConnection.setDoOutput(true);
> urlConnection.setRequestMethod("POST");
> urlConnection.setRequestProperty("Connection", "Keep-Alive");
> urlConnection.setRequestProperty("Cache-Control", "no-cache");
> urlConnection.addRequestProperty("Content-Type", "multipart/form-data;
> boundary=" + boundaryString);
>
> OutputStream outputStreamToRequestBody = urlConnection.getOutputStream();
> BufferedWriter httpRequestBodyWriter =
> new BufferedWriter(new OutputStreamWriter(outputStreamToRequestBody));
>
> String payloadString = boundaryString + crlf + "Content-Disposition:
> form-data;"
> + " name=\"jarfile\";"
> + " filename=\"Test-1.jar\""+crlf
> + "Content-Type: application/x-java-archive"+crlf+crlf;
> System.out.println(payloadString);
> httpRequestBodyWriter.write(payloadString);
> httpRequestBodyWriter.flush();
>
> // Write the actual file contents
> FileInputStream inputStream = new FileInputStream(jarToUpload);
>
> int bytesRead;
> byte[] dataBuffer = new byte[1024];
> while((bytesRead = inputStream.read(dataBuffer)) != -1) {
> outputStreamToRequestBody.write(dataBuffer, 0, bytesRead);
> }
>
> outputStreamToRequestBody.flush();
>
> httpRequestBodyWriter.write(boundaryString +crlf);
> httpRequestBodyWriter.flush();
>
> inputStream.close();
> outputStreamToRequestBody.close();
> httpRequestBodyWriter.close();
> BufferedReader httpResponseReader =
> new BufferedReader(new InputStreamReader(
> urlConnection.getInputStream()));
> String lineRead;
> while((lineRead = httpResponseReader.readLine()) != null) {
> System.out.println(lineRead);
> }
>
> The documentation of Flink REST API is not so detailed, or better it
> doesn't include a clear example.
>
> Do you have any idea to solve the error?
>
>
> thanks
>
> 2018-01-18 12:54 GMT+01:00 Timo Walther :
>
>> Hi Luigi,
>>
>> I'm also working on a solution for submitting jobs programmatically. You
>> can look into my working branch [1]. As far as I know, the best and most
>> stable solution is using the ClusterClient. But this is internal API and
>> might change.
>>
>> You could also use Flink's REST API for submitting a job [2].
>>
>> Regards,
>> Timo
>>
>> [1] https://github.com/twalthr/flink/blob/FLIN

Re: Submitting jobs via Java code

2018-01-18 Thread Luigi Sgaglione
Hi Timo,

I think that the REST API is the most suitable solution. Thanks.

So, I'm trying to use the Flink REST API and I'm able to perform get
request but not the post one.

In particular when I issue a post to upload the jar I receive this error
form the server: {"error": "Failed to upload the file."}

this is the used code:


 URL serverUrl = new URL("http://192.168.149.130:8081/jars/upload";);

HttpURLConnection urlConnection = (HttpURLConnection)
serverUrl.openConnection();

String boundaryString = "--Boundary";
String crlf = "\r\n";
String fileUrl = "Test-1.jar";
File jarToUpload = new File(fileUrl);

urlConnection.setDoOutput(true);
urlConnection.setRequestMethod("POST");
urlConnection.setRequestProperty("Connection", "Keep-Alive");
urlConnection.setRequestProperty("Cache-Control", "no-cache");
urlConnection.addRequestProperty("Content-Type", "multipart/form-data;
boundary=" + boundaryString);

OutputStream outputStreamToRequestBody = urlConnection.getOutputStream();
BufferedWriter httpRequestBodyWriter =
new BufferedWriter(new OutputStreamWriter(outputStreamToRequestBody));

String payloadString = boundaryString + crlf + "Content-Disposition:
form-data;"
+ " name=\"jarfile\";"
+ " filename=\"Test-1.jar\""+crlf
+ "Content-Type: application/x-java-archive"+crlf+crlf;
System.out.println(payloadString);
httpRequestBodyWriter.write(payloadString);
httpRequestBodyWriter.flush();

// Write the actual file contents
FileInputStream inputStream = new FileInputStream(jarToUpload);

int bytesRead;
byte[] dataBuffer = new byte[1024];
while((bytesRead = inputStream.read(dataBuffer)) != -1) {
outputStreamToRequestBody.write(dataBuffer, 0, bytesRead);
}

outputStreamToRequestBody.flush();

httpRequestBodyWriter.write(boundaryString +crlf);
httpRequestBodyWriter.flush();

inputStream.close();
outputStreamToRequestBody.close();
httpRequestBodyWriter.close();
BufferedReader httpResponseReader =
new BufferedReader(new
InputStreamReader(urlConnection.getInputStream()));
String lineRead;
while((lineRead = httpResponseReader.readLine()) != null) {
System.out.println(lineRead);
}

The documentation of Flink REST API is not so detailed, or better it
doesn't include a clear example.

Do you have any idea to solve the error?


thanks

2018-01-18 12:54 GMT+01:00 Timo Walther :

> Hi Luigi,
>
> I'm also working on a solution for submitting jobs programmatically. You
> can look into my working branch [1]. As far as I know, the best and most
> stable solution is using the ClusterClient. But this is internal API and
> might change.
>
> You could also use Flink's REST API for submitting a job [2].
>
> Regards,
> Timo
>
> [1] https://github.com/twalthr/flink/blob/FLINK-7594_rebased/
> flink-libraries/flink-sql-client/src/main/java/org/
> apache/flink/table/client/gateway/LocalExecutor.java
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/monitoring/rest_api.html#submitting-programs
>
> Am 1/18/18 um 11:41 AM schrieb Luigi Sgaglione:
>
> Hi Timo,
> my objective is to create a web interface that allows me to edit and
> deploy jobs on Flink.
>
> To do so I'm evaluating all possibilities provided by Flink APIs.
>
> What do you think that is the best solution?
>
> Thanks
> Luigi
>
> Il 18/gen/2018 09:39, "Timo Walther"  ha scritto:
>
>> Hi Luigi,
>>
>> can you try to load an entire configuration file via
>> GlobalConfiguration.loadConfiguration(flinkConfigDir). Maybe you tell us
>> a little bit what you want to achieve?
>>
>> Is the programmatic submission a requirement for you? Did you consider
>> using the RemoteStreamEnvironment?
>>
>> Regards,
>> Timo
>>
>>
>> Am 1/17/18 um 5:08 PM schrieb Luigi Sgaglione:
>>
>> Hi,
>>
>> I am a beginner in Flink and I'm trying to deploy a simple example using
>> a java client in a remote Flink server (1.4.0).
>>
>> I'm using org.apache.flink.client.program.Client
>>
>> this is the used code:
>>
>> Configuration config = new Configuration();
>> config.setString("jobmanager.rpc.address", "192.168.149.130");
>> config.setInteger("jobmanager.rpc.port", 6123);
>>
>> Client c = new Client(config);
>>
>> PackagedProgram prg = new PackagedProgram(new File("target/Test-1.jar"));
>> c.runDetached(prg, 1);
>>
>>
>> but when I try to deploy the jar I receive the following error:
>>
>> 16:03:20,035 INFO  org.apache.flink.client.program.Client
>> - Looking up JobManager
>> Exception in thread "main" 
>> org.apache.flink.client.program.ProgramInvocationException:
>> *Failed to retrieve the JobManager gateway.*
>> at org.apache.flink.client.program.Client.runDetached(Client.java:380)
>> at org.apache.flink.client.program.Client.runDetached(Client.java:355)
>> at org.apache.flink.client.program.Client.runDetached(Client.java:340)
>> at org.apache.flink.client.program.ContextEnvironment.execute(C
>> ontextEnvironment.java:74)
>> at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu
>> tionEnvironment.java:804)
>> at org.apache

Re: Scheduling of GroupByKey and CombinePerKey operations

2018-01-18 Thread Fabian Hueske
Hi Pawel,

This question might be better suited for the Beam user list.
Beam includes the Beam Flink runner which translates Beam programs into
Flink programs.

Best,
Fabian

2018-01-18 16:02 GMT+01:00 Pawel Bartoszek :

> Can I ask why some operations run only one slot? I understand that file
> writes should happen only one one slot but GroupByKey operation could be
> distributed across all slots. I am having around 20k distinct keys every
> minute. Is there any way to break this operator chain?
>
> I noticed that CombinePerKey operations that don't have IO related
> transformation are scheduled across all 32 slots.
>
>
> My cluster has 32 slots across 2 task managers. Running Beam 2.2. and
> Flink 1.3.2
>
> 2018-01-18, 13:56:28 2018-01-18, 14:37:14 40m 45s GroupByKey ->
> ParMultiDo(WriteShardedBundles) -> ParMultiDo(Anonymous) ->
> xxx.pipeline.output.io.file.WriteWindowToFile-SumPlaybackBitrateResult2/
> TextIO.Write/WriteFiles/Reshuffle/Window.Into()/Window.Assign.out ->
> ParMultiDo(ReifyValueTimestamp) -> ToKeyedWorkItem 149 MB 333,672 70.8 MB
> 19 32
> 0032
> RUNNING
>
> Start TimeEnd TimeDurationBytes receivedRecords receivedBytes sentRecords
> sentAttemptHostStatus
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 77.5 MB 333,683 2.21 MB 20 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
>
> Thanks,
> Pawel
>


Re: Far too few watermarks getting generated with Kafka source

2018-01-18 Thread William Saar
Hi,
The watermark does not seem to get updated at all after the first one
is emitted. We used to get out-of-order warnings, but we changed to
job to support a bounded timestamp extractor so we no longer get those
warnings.

Our timestamp extractor looks like this

class 
TsExtractor[T
](time : Time) extends 
BoundedOutOfOrdernessTimestampExtractor[Timestamped[T
]](time : Time) {
override def 
extractTimestamp(element: Timestamped[T
]): Long = element.timestamp
}

Our stream topology starts with a single stream, then we do two
separate flat map and filtering operations on the initial stream to
transform data batches 
into streams of two different event types. We then
assignTimestampsAndWatermarks(new 
TsExtractor[EventType](Time.seconds
(20
))) for each event type on both 
branches before unioning the two branches to a single stream again
(the reason for the split is that the data used to come from two
different topics). 

William

- Original Message -
From:
 "Gary Yao" 

To:
"William Saar" 
Cc:
"user" 
Sent:
Thu, 18 Jan 2018 11:11:17 +0100
Subject:
Re: Far too few watermarks getting generated with Kafka source

Hi William,

How often does the Watermark get updated? Can you share your code that
generates
the watermarks? Watermarks should be strictly ascending. If your code
produces
watermarks that are not ascending, smaller ones will be discarded.
Could it be
that the events in Kafka are more "out of order" with respect to event
time than
in your file?

You can assign timestamps in the Kafka source or later. The Flink
documentation
has a section on why it could be beneficial to assign Watermarks in
the Kafka
source:

 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
[1]

Best,
Gary

On Wed, Jan 17, 2018 at 5:15 PM, William Saar 
 wrote:
Hi,
I have a job where we read data from either Kafka or a file (for
testing), decode the entries and flat map them into events, and then
add a timestamp and watermark assigner to the events in a later
operation. This seems to generate periodic watermarks when running
from a file, but when Kafka is the source we barely get any watermark
updates. What could be causing this? (the environment has
setAutowatermarkInterval(1000))

Do we need to do all the timestamp and watermark assignment in the
Kafka source? or should it work to do it in later operations? The
events do seem to get propagated through the pipeline, we're just not
getting watermarks...

Thanks,
William

 

Links:
--
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
[2] mailto:will...@saar.se



Scheduling of GroupByKey and CombinePerKey operations

2018-01-18 Thread Pawel Bartoszek
Can I ask why some operations run only one slot? I understand that file
writes should happen only one one slot but GroupByKey operation could be
distributed across all slots. I am having around 20k distinct keys every
minute. Is there any way to break this operator chain?

I noticed that CombinePerKey operations that don't have IO related
transformation are scheduled across all 32 slots.


My cluster has 32 slots across 2 task managers. Running Beam 2.2. and Flink
1.3.2

2018-01-18, 13:56:28 2018-01-18, 14:37:14 40m 45s GroupByKey ->
ParMultiDo(WriteShardedBundles) -> ParMultiDo(Anonymous) ->
xxx.pipeline.output.io.file.WriteWindowToFile-SumPlaybackBitrateResult2/TextIO.Write/WriteFiles/Reshuffle/Window.Into()/Window.Assign.out
-> ParMultiDo(ReifyValueTimestamp) -> ToKeyedWorkItem 149 MB 333,672 70.8 MB
19 32
0032
RUNNING

Start TimeEnd TimeDurationBytes receivedRecords receivedBytes sentRecords
sentAttemptHostStatus
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 77.5 MB 333,683 2.21 MB 20 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING

Thanks,
Pawel


RE: Multiple Elasticsearch sinks not working in Flink

2018-01-18 Thread Teena Kappen // BPRISE
Hi Timo,

It works fine when the second sink is a Cassandra Sink. The data gets read from 
KafkaTopic2 and it gets written to Cassandra as expected.

Regards,
Teena

From: Timo Walther [mailto:twal...@apache.org]
Sent: 18 January 2018 18:41
To: user@flink.apache.org
Subject: Re: Multiple Elasticsearch sinks not working in Flink

Hi Teena,

what happens if you replace the second sink with a non-ElasticSearchSink? Is 
there the same result? Is the data read from the KafkaTopic2?

We should determine which system is the bottleneck.

Regards,
Timo


Am 1/18/18 um 9:53 AM schrieb Teena Kappen // BPRISE:
Hi,

I am running flink 1.4 in single node. My job has two Kafka consumers reading 
from separate topics. After fetching the data, the job writes it to two 
separate Elasticsearch sinks. So the process is like this

KafkaTopic1 -> Kafkaconsumer1 -> create output record -> Elasticsearchsink1
KafkaTopic2 -> Kafkaconsumer2 -> create output record -> Elasticsearchsink2

Both the streams and their processing are completely unrelated. The first sink 
works as expected and it writes the output for all input records. The second 
sink writes to Elasticsearch only once and after that it stops writing to 
Elasticsearch even if there is more data that gets fed into Kafka. Sometimes, 
it does not even write once. We tested this in two other jobs and the same 
issue is there in all of them.

I have attached a sample code I had created to illustrate the issue. We are 
using Elasticsearch version 5.6.4 and hence the dependency used is 
'flink-connector-elasticsearch5_2.11'.

Regards,
Teena








Re: Flink CEP exception during RocksDB update

2018-01-18 Thread Kostas Kloudas
Thanks a lot Varun!

Kostas

> On Jan 17, 2018, at 9:59 PM, Varun Dhore  wrote:
> 
> Thank you Kostas. Since this error is not easily reproducible on my end I’ll 
> continue testing this and confirm the resolution once I am able to do so.
> 
> Thanks,
> Varun 
> 
> Sent from my iPhone
> 
> On Jan 15, 2018, at 10:21 AM, Kostas Kloudas  > wrote:
> 
>> Hi Varun,
>> 
>> This can be related to this issue: 
>> https://issues.apache.org/jira/browse/FLINK-8226 
>> 
>> which is currently fixed on the master.
>> 
>> Could you please try the current master to see if the error persists?
>> 
>> Thanks,
>> Kostas
>> 
>>> On Jan 15, 2018, at 4:09 PM, Varun Dhore >> > wrote:
>>> 
>>> 
>>> 
 Hello Flink community,
  
 I have encountered following exception while testing 1.4.0 release. This 
 error is occurring intermittently and my CEP job keeps restarting after 
 this exception. I am running the job with Event time semantics and 
 checkpoints enabled.
  
  
 java.lang.RuntimeException: Exception occurred while 
 processing valve output watermark:
 at 
 org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
 at 
 org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
 at 
 org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
 at 
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
 at 
 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
 at 
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.RuntimeException: Error while adding data 
 to RocksDB
 at 
 org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
 at 
 org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
 at 
 org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
 at 
 org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
 at 
 org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
 at 
 org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
 at 
 org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
 ... 7 more
 Caused by: java.lang.IllegalStateException: Could not find id 
 for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, 
 timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, 1)], 
 2)
 at 
 org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
 at 
 org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)
 at 
 org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)
 at 
 org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)
 at 
 org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)
 at 
 org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
 ... 13 more
  
  
 Thanks,
 Varun
>> 



Re: Multiple Elasticsearch sinks not working in Flink

2018-01-18 Thread Timo Walther

Hi Teena,

what happens if you replace the second sink with a 
non-ElasticSearchSink? Is there the same result? Is the data read from 
the KafkaTopic2?


We should determine which system is the bottleneck.

Regards,
Timo


Am 1/18/18 um 9:53 AM schrieb Teena Kappen // BPRISE:


Hi,

I am running flink 1.4 in single node. My job has two Kafka consumers 
reading from separate topics. After fetching the data, the job writes 
it to two separate Elasticsearch sinks. So the process is like this


KafkaTopic1 -> Kafkaconsumer1 -> create output record -> 
Elasticsearchsink1


KafkaTopic2 -> Kafkaconsumer2 -> create output record -> 
Elasticsearchsink2


Both the streams and their processing are completely unrelated. The 
first sink works as expected and it writes the output for all input 
records. The second sink writes to Elasticsearch only once and after 
that it stops writing to Elasticsearch even if there is more data that 
gets fed into Kafka. Sometimes, it does not even write once. We tested 
this in two other jobs and the same issue is there in all of them.


I have attached a sample code I had created to illustrate the issue. 
We are using Elasticsearch version 5.6.4 and hence the dependency used 
is ‘flink-connector-elasticsearch5_2.11’.


Regards,

Teena





Re: Which collection to use in Scala case class

2018-01-18 Thread Timo Walther
I filed a more specific issue for this: 
https://issues.apache.org/jira/browse/FLINK-8451


Am 1/18/18 um 10:47 AM schrieb shashank agarwal:


@Chesnay , @Timo, yes it's simple case class which i am using with 
java.util.List and one case class with Option and Seq. With CEP.


I have filed Jira bugs also for that. I have put logs also there.

https://issues.apache.org/jira/browse/FLINK-7760

I have the issue with Rocksdb checkpointing also I have filed this bug 
also:


https://issues.apache.org/jira/browse/FLINK-7756





‌

On Wed, Jan 17, 2018 at 2:55 PM, shashank agarwal 
mailto:shashank...@gmail.com>> wrote:


Hello,

A quick question which scala collection should I use in my scala
case class which won't go through generic serializer.

I was using java.utill.List in my scala case class before, Will
this create the problem in savepoint and restore. cause, my
restore is not working so i am trying to replace that collection
with other scala collection.

Even PatternStream in CEP using Range which goes through generic
serializer.

when I put  "env.getConfig.disableGenericTypes" in my program CEP
gives error. After analyzing I found Range was used in
PatternStream so it's throwing an error.

-- 
Thanks Regards


SHASHANK AGARWAL
 ---  Trying to mobilize the things




‌




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things





Re: CaseClassTypeInfo fails to deserialize in flink 1.4 with Parent First Class Loading

2018-01-18 Thread Timo Walther

I filed an issue for this: https://issues.apache.org/jira/browse/FLINK-8451

Am 1/12/18 um 4:40 PM schrieb Seth Wiesman:

Here is the stack trace:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of 
scala.collection.immutable.$colon$colon to field 
org.apache.flink.api.scala.typeutils.CaseClassTypeInfo.fieldNames of type 
scala.collection.Seq in instance of 
com.mediamath.reporting.PerformanceJob$$anon$3
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
... 4 more


Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New 
York, NY 10007
swies...@mediamath.com


  


On 1/12/18, 9:12 AM, "Tzu-Li (Gordon) Tai"  wrote:

 Hi Seth,
 
 Thanks a lot for the report!
 
 I think your observation is expected behaviour, if there really is a binary

 incompatible change between Scala minor releases.
 And yes, the type information macro in the Scala API is very sensitive to
 the exact Scala version used. I had in the past also observed generated 
case
 class serializers by the macro to be incompatible across different Scala
 minor releases.
 
 Just curious, what exactly is the deserialization failure you observed when

 using parent-first classloading?
 Perhaps we should properly document these surprises somewhere in the
 documentation ...
 
 Cheers,

 Gordon
 
 
 
 
 
 --

 Sent from: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
 





Re: Submitting jobs via Java code

2018-01-18 Thread Timo Walther

Hi Luigi,

I'm also working on a solution for submitting jobs programmatically. You 
can look into my working branch [1]. As far as I know, the best and most 
stable solution is using the ClusterClient. But this is internal API and 
might change.


You could also use Flink's REST API for submitting a job [2].

Regards,
Timo

[1] 
https://github.com/twalthr/flink/blob/FLINK-7594_rebased/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#submitting-programs


Am 1/18/18 um 11:41 AM schrieb Luigi Sgaglione:

Hi Timo,
my objective is to create a web interface that allows me to edit and 
deploy jobs on Flink.


To do so I'm evaluating all possibilities provided by Flink APIs.

What do you think that is the best solution?

Thanks
Luigi

Il 18/gen/2018 09:39, "Timo Walther" > ha scritto:


Hi Luigi,

can you try to load an entire configuration file via
GlobalConfiguration.loadConfiguration(flinkConfigDir). Maybe you
tell us a little bit what you want to achieve?

Is the programmatic submission a requirement for you? Did you
consider using the RemoteStreamEnvironment?

Regards,
Timo


Am 1/17/18 um 5:08 PM schrieb Luigi Sgaglione:

Hi,

I am a beginner in Flink and I'm trying to deploy a simple
example using a java client in a remote Flink server (1.4.0).

I'm using org.apache.flink.client.program.Client

this is the used code:

Configuration config = new Configuration();
config.setString("jobmanager.rpc.address", "192.168.149.130");
config.setInteger("jobmanager.rpc.port", 6123);

Client c = new Client(config);

PackagedProgram prg = new PackagedProgram(new
File("target/Test-1.jar"));
c.runDetached(prg, 1);


but when I try to deploy the jar I receive the following error:

16:03:20,035 INFO org.apache.flink.client.program.Client        
          - Looking up JobManager
Exception in thread "main"
org.apache.flink.client.program.ProgramInvocationException:
*Failed to retrieve the JobManager gateway.*
at
org.apache.flink.client.program.Client.runDetached(Client.java:380)
at
org.apache.flink.client.program.Client.runDetached(Client.java:355)
at
org.apache.flink.client.program.Client.runDetached(Client.java:340)
at

org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:74)
at

org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1495)
at flink.Job.main(Job.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at
org.apache.flink.client.program.Client.runDetached(Client.java:279)
at flink.DeployJob.main(DeployJob.java:24)
Caused by:
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException:
*Could not retrieve the leader gateway*
at

org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:102)
at
org.apache.flink.client.program.Client.getJobManagerGateway(Client.java:567)
at
org.apache.flink.client.program.Client.runDetached(Client.java:378)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed
out after [10 milliseconds]
at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:116)
at

scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:116)
at scala.concurrent.Await.result(package.scala)
at

org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:100)
... 17 more



Maybe I missed some configuration of the client.
Can you help me to solve the problem?

Thanks







Re: BucketingSink broken in flink 1.4.0 ?

2018-01-18 Thread Stephan Ewen
Re-posting the solution here from other threads:

You can fix this by either

  - Removing all Hadoop dependencies from your user jar
  - Set the framework back to parent-first classloading: https://ci.
apache.org/projects/flink/flink-docs-master/monitoring/
debugging_classloading.html#configuring-classloader-resolution-order

Hope that helps.

On Wed, Jan 10, 2018 at 3:28 PM, Kyle Hamlin  wrote:

> I'm having similar issues after moving from 1.3..2 to 1.4.0
>
> *My mailing list thread: *BucketingSink doesn't work anymore moving from
> 1.3.2 to 1.4.0
> 
>
> I'm not actually using hdfs as my sink. I'll be using s3 as my final sink
> but I get the following error even when I've given a local file path to the
> BucketingSink.
>
> java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:358)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
> tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
> restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:259)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot instantiate file system for URI:
> hdfs://localhost:12345/
> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(
> HadoopFsFactory.java:187)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.
> createHadoopFileSystem(BucketingSink.java:1154)
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initFileSystem(BucketingSink.java:411)
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:355)
> ... 9 more
> Caused by: java.lang.ClassCastException
>
>
>
>
>
>
> On Wed, Jan 10, 2018 at 1:39 PM Chesnay Schepler 
> wrote:
>
>> Your analysis looks correct, the code in question will never properly
>> detect hadoop file systems. I'll open a jira.
>>
>> Your suggestion to replace it with getUnguardedFileSystem() was my first
>> instinct as well.
>>
>> Good job debugging this.
>>
>>
>> On 10.01.2018 14:17, jelmer wrote:
>>
>> Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0
>>
>> But i am running into the issue that the bucketing sink will always try
>> and connect to hdfs://localhost:12345/ instead of the hfds url i have
>> specified in the constructor
>>
>> If i look at the code at
>>
>> https://github.com/apache/flink/blob/master/flink-
>> connectors/flink-connector-filesystem/src/main/java/org/
>> apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125
>>
>>
>> It tries to create the hadoop filesystem like this
>>
>> final org.apache.flink.core.fs.FileSystem flinkFs =
>> org.apache.flink.core.fs.FileSystem.get(path.toUri());
>> final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
>> ((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;
>>
>> But FileSystem.getUnguardedFileSystem will always return a
>>
>>
>> But FileSystem.get will always return a SafetyNetWrapperFileSystem so the
>> instanceof check will never indicate that its a hadoop filesystem
>>
>>
>> Am i missing something or is this a bug and if so what would be the
>> correct fix ? I guess replacing FileSystem.get with 
>> FileSystem.getUnguardedFileSystem
>> would fix it but I am afraid I lack the context to know if that would be
>> safe
>>
>>
>>


Re: Submitting jobs via Java code

2018-01-18 Thread Luigi Sgaglione
Hi Timo,
my objective is to create a web interface that allows me to edit and deploy
jobs on Flink.

To do so I'm evaluating all possibilities provided by Flink APIs.

What do you think that is the best solution?

Thanks

2018-01-18 9:39 GMT+01:00 Timo Walther :

> Hi Luigi,
>
> can you try to load an entire configuration file via GlobalConfiguration.
> loadConfiguration(flinkConfigDir). Maybe you tell us a little bit what
> you want to achieve?
>
> Is the programmatic submission a requirement for you? Did you consider
> using the RemoteStreamEnvironment?
>
> Regards,
> Timo
>
>
> Am 1/17/18 um 5:08 PM schrieb Luigi Sgaglione:
>
> Hi,
>
> I am a beginner in Flink and I'm trying to deploy a simple example using a
> java client in a remote Flink server (1.4.0).
>
> I'm using org.apache.flink.client.program.Client
>
> this is the used code:
>
> Configuration config = new Configuration();
> config.setString("jobmanager.rpc.address", "192.168.149.130");
> config.setInteger("jobmanager.rpc.port", 6123);
>
> Client c = new Client(config);
>
> PackagedProgram prg = new PackagedProgram(new File("target/Test-1.jar"));
> c.runDetached(prg, 1);
>
>
> but when I try to deploy the jar I receive the following error:
>
> 16:03:20,035 INFO  org.apache.flink.client.program.Client
> - Looking up JobManager
> Exception in thread "main" 
> org.apache.flink.client.program.ProgramInvocationException:
> *Failed to retrieve the JobManager gateway.*
> at org.apache.flink.client.program.Client.runDetached(Client.java:380)
> at org.apache.flink.client.program.Client.runDetached(Client.java:355)
> at org.apache.flink.client.program.Client.runDetached(Client.java:340)
> at org.apache.flink.client.program.ContextEnvironment.execute(
> ContextEnvironment.java:74)
> at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu
> tionEnvironment.java:804)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1495)
> at flink.Job.main(Job.java:67)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> at java.lang.reflect.Method.invoke(Unknown Source)
> at org.apache.flink.client.program.PackagedProgram.callMainMeth
> od(PackagedProgram.java:497)
> at org.apache.flink.client.program.PackagedProgram.invokeIntera
> ctiveModeForExecution(PackagedProgram.java:395)
> at org.apache.flink.client.program.Client.runDetached(Client.java:279)
> at flink.DeployJob.main(DeployJob.java:24)
> Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException:
> *Could not retrieve the leader gateway*
> at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveL
> eaderGateway(LeaderRetrievalUtils.java:102)
> at org.apache.flink.client.program.Client.getJobManagerGateway(
> Client.java:567)
> at org.apache.flink.client.program.Client.runDetached(Client.java:378)
> ... 15 more
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [10 milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:116)
> at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(B
> lockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:116)
> at scala.concurrent.Await.result(package.scala)
> at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveL
> eaderGateway(LeaderRetrievalUtils.java:100)
> ... 17 more
>
>
>
> Maybe I missed some configuration of the client.
> Can you help me to solve the problem?
>
> Thanks
>
>
>


Re: Far too few watermarks getting generated with Kafka source

2018-01-18 Thread Gary Yao
Hi William,

How often does the Watermark get updated? Can you share your code that
generates
the watermarks? Watermarks should be strictly ascending. If your code
produces
watermarks that are not ascending, smaller ones will be discarded. Could it
be
that the events in Kafka are more "out of order" with respect to event time
than
in your file?

You can assign timestamps in the Kafka source or later. The Flink
documentation
has a section on why it could be beneficial to assign Watermarks in the
Kafka
source:


https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition

Best,
Gary

On Wed, Jan 17, 2018 at 5:15 PM, William Saar  wrote:

> Hi,
> I have a job where we read data from either Kafka or a file (for testing),
> decode the entries and flat map them into events, and then add a timestamp
> and watermark assigner to the events in a later operation. This seems to
> generate periodic watermarks when running from a file, but when Kafka is
> the source we barely get any watermark updates. What could be causing this?
> (the environment has setAutowatermarkInterval(1000))
>
> Do we need to do all the timestamp and watermark assignment in the Kafka
> source? or should it work to do it in later operations? The events do seem
> to get propagated through the pipeline, we're just not getting watermarks...
>
> Thanks,
> William
>


Re: Which collection to use in Scala case class

2018-01-18 Thread shashank agarwal
@Chesnay , @Timo, yes it's simple case class which i am using with
java.util.List
and one case class with Option and Seq. With CEP.

I have filed Jira bugs also for that. I have put logs also there.

https://issues.apache.org/jira/browse/FLINK-7760

I have the issue with Rocksdb checkpointing also I have filed this bug also:

https://issues.apache.org/jira/browse/FLINK-7756





‌

On Wed, Jan 17, 2018 at 2:55 PM, shashank agarwal 
wrote:

> Hello,
>
> A quick question which scala collection should I use in my scala case
> class which won't go through generic serializer.
>
> I was using java.utill.List in my scala case class before, Will this
> create the problem in savepoint and restore. cause, my restore is not
> working so i am trying to replace that collection with other scala
> collection.
>
> Even PatternStream in CEP using Range which goes through generic
> serializer.
>
> when I put  "env.getConfig.disableGenericTypes" in my program CEP gives
> error. After analyzing I found Range was used in PatternStream so it's
> throwing an error.
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things
>
>
>
>
> ‌
>



-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things


Multiple Elasticsearch sinks not working in Flink

2018-01-18 Thread Teena Kappen // BPRISE
Hi,

I am running flink 1.4 in single node. My job has two Kafka consumers reading 
from separate topics. After fetching the data, the job writes it to two 
separate Elasticsearch sinks. So the process is like this

KafkaTopic1 -> Kafkaconsumer1 -> create output record -> Elasticsearchsink1
KafkaTopic2 -> Kafkaconsumer2 -> create output record -> Elasticsearchsink2

Both the streams and their processing are completely unrelated. The first sink 
works as expected and it writes the output for all input records. The second 
sink writes to Elasticsearch only once and after that it stops writing to 
Elasticsearch even if there is more data that gets fed into Kafka. Sometimes, 
it does not even write once. We tested this in two other jobs and the same 
issue is there in all of them.

I have attached a sample code I had created to illustrate the issue. We are 
using Elasticsearch version 5.6.4 and hence the dependency used is 
'flink-connector-elasticsearch5_2.11'.

Regards,
Teena




public class ElasticSearchTest1 {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// set elasticsearch connection details 
Map config = new HashMap<>();
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "");
List transports = new ArrayList<>(); 
transports.add(new 
InetSocketAddress(InetAddress.getByName(""), 9300));

//Set properties for Kafka Streaming
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", ""+":9092");
properties.setProperty("group.id", "testGroup");
properties.setProperty("auto.offset.reset", "latest");  

//Create consumer for log records

FlinkKafkaConsumer011 inputConsumer1 = new 
FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), 
properties);

DataStream firstStream = env
.addSource(inputConsumer1)
.flatMap(new CreateRecordOne());

firstStream 
.addSink(new ElasticsearchSink(config, 
transports, 
new 
ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1")));

FlinkKafkaConsumer011 inputConsumer2 = new 
FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), 
properties);

DataStream secondStream = env
.addSource(inputConsumer2)  
.flatMap(new CreateRecordTwo());

secondStream
.addSink(new ElasticsearchSink(config, 
transports, 
new 
ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2")));

env.execute("Elastic Search Test");
}
}

public class ElasticSearchOutputRecord implements 
ElasticsearchSinkFunction {

String index;
String type;
// Initialize filter function
public ElasticSearchOutputRecord(String index, String type) {
this.index = index;
this.type = type;
}
// construct index request
@Override
public void process(
RecordOne record,
RuntimeContext ctx,
RequestIndexer indexer) {

// construct JSON document to index
Map json = new HashMap<>();

json.put("item_one", record.item1);  
json.put("item_two", record.item2);  

IndexRequest rqst = Requests.indexRequest()
.index(index)   // index name
.type(type) // mapping name
.source(json);

indexer.add(rqst);
}
}

public class ElasticSearchOutputRecord2 implements 
ElasticsearchSinkFunction {

String index;
String type;
// Initialize filter function
public ElasticSearchOutputRecord2(String index, String type) {
this.index = index;
this.type = type;
}
// construct index request
@Override
public void process(
RecordTwo record,
RuntimeContext ctx,
RequestIndexer indexer) {

// construct JSON docu

Re: Submitting jobs via Java code

2018-01-18 Thread Timo Walther

Hi Luigi,

can you try to load an entire configuration file via 
GlobalConfiguration.loadConfiguration(flinkConfigDir). Maybe you tell us 
a little bit what you want to achieve?


Is the programmatic submission a requirement for you? Did you consider 
using the RemoteStreamEnvironment?


Regards,
Timo


Am 1/17/18 um 5:08 PM schrieb Luigi Sgaglione:

Hi,

I am a beginner in Flink and I'm trying to deploy a simple example 
using a java client in a remote Flink server (1.4.0).


I'm using org.apache.flink.client.program.Client

this is the used code:

Configuration config = new Configuration();
config.setString("jobmanager.rpc.address", "192.168.149.130");
config.setInteger("jobmanager.rpc.port", 6123);

Client c = new Client(config);

PackagedProgram prg = new PackagedProgram(new
File("target/Test-1.jar"));
c.runDetached(prg, 1);


but when I try to deploy the jar I receive the following error:

16:03:20,035 INFO  org.apache.flink.client.program.Client             
          - Looking up JobManager
Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: *Failed to 
retrieve the JobManager gateway.*

at org.apache.flink.client.program.Client.runDetached(Client.java:380)
at org.apache.flink.client.program.Client.runDetached(Client.java:355)
at org.apache.flink.client.program.Client.runDetached(Client.java:340)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:74)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)

at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1495)
at flink.Job.main(Job.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)

at org.apache.flink.client.program.Client.runDetached(Client.java:279)
at flink.DeployJob.main(DeployJob.java:24)
Caused by: 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: 
*Could not retrieve the leader gateway*
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:102)
at 
org.apache.flink.client.program.Client.getJobManagerGateway(Client.java:567)

at org.apache.flink.client.program.Client.runDetached(Client.java:378)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out 
after [10 milliseconds]

at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:116)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

at scala.concurrent.Await$.result(package.scala:116)
at scala.concurrent.Await.result(package.scala)
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:100)

... 17 more



Maybe I missed some configuration of the client.
Can you help me to solve the problem?

Thanks