Re: How to user ParameterTool.fromPropertiesFile() to get resource file inside my jar

2017-09-27 Thread Aljoscha Krettek
Hi Ron,

This is the issue and it was already implemented for upcoming 1.3.3 and 1.4.0.

https://issues.apache.org/jira/browse/FLINK-7630 


Best,
Aljoscha

> On 26. Sep 2017, at 21:10, Ron Crocker  wrote:
> 
> What’s crazy is that I just stumbled on the same issue. Thanks for sharing!
> 
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com 
> M: +1 630 363 8835
> 
>> On Sep 15, 2017, at 7:30 AM, Tony Wei > > wrote:
>> 
>> Hi Aljoscha,
>> 
>> Thanks for your reply. It looks great to have hat feature. I will create a 
>> Jira issue for that and try to solve it.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2017-09-15 20:51 GMT+08:00 Aljoscha Krettek > >:
>> Hi,
>> 
>> I think calling getPath() on the URL returned from getResource() loses some 
>> of the information that is required to resolve the file in the jar. The 
>> solution should be to allow passing a "File" to 
>> ParameterTool.fromPropertiesFile() or to allow passing an InputStream to 
>> ParameterTool.fromPropertiesFile(). Passing a File should work because a 
>> File can be constructed from an URI and a URL can be turned into a URI.
>> 
>> Would you be interested in opening a Jira issue for that and working on it?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 15. Sep 2017, at 03:32, Tony Wei >> > wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> I used Maven with this command "mvn clean package -Pbuild-jar" to create 
>>> the jar.
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> Aljoscha Krettek mailto:aljos...@apache.org>>於 
>>> 2017年9月14日 週四,下午6:24寫道:
>>> Hi,
>>> 
>>> Are you using Maven to create the Jar or your IDE? I think this might be a 
>>> problem only when creating the Jar via the IDE.
>>> 
>>> Best,
>>> Aljoscha
>>> 
 On 11. Sep 2017, at 04:46, Tony Wei >>> > wrote:
 
 Hi Aljoscha,
 
 I found the root cause of my problem from this reference 
 https://stackoverflow.com/questions/18151072/cant-find-resource-file-after-exporting-to-a-runnable-jar
  
 .
 So I changed the way to use ParameterTool. I read the configurations from 
 InputStream, construct them as argument format and used 
 ParameterTool.fromArgs() to parse them with other arguments.
 I'm not sure if this is a good solution. If you have any better one, 
 please let me know. Thanks for your help.
 
 Best Regards,
 Tony Wei
 
 2017-09-08 23:40 GMT+08:00 Tony Wei >>> >:
 Hi Aljoscha,
 
 I have tried 
 `StreamJob.class.getClassLoader().getResource("application.conf").getPath()`,
  but I got this exception.
 
 Caused by: java.io.FileNotFoundException: Properties file 
 /home/tonywei/flink/file:/tmp/flink-web-24351e69-a261-45be-9503-087db8155a8f/d69a3ca9-bfa0-43ef-83e8-e15f38162a87_quickstart-0.1.jar!/application.conf
 
 Best Regards,
 Tony Wei
 
 2017-09-08 23:24 GMT+08:00 Aljoscha Krettek >>> >:
 Hi,
 
 How are you specifying the path for the properties file? Have you tried 
 reading the properties by using 
 this.getClass().getClassLoader().getResource()?
 
 Best,
 Aljoscha
 
 > On 8. Sep 2017, at 16:32, Tony Wei >>> > > wrote:
 >
 > Hi,
 >
 > I put the my configuration file in `./src/main/resources/` and packed it 
 > inside my jar.
 > I want to run it on standalone cluster by using web UI to submit my job.
 > No matter which way I tried, the ParameterTool.fromPropertiesFile() 
 > couldn't find the file path, but threw `FileNotFoundException` instead.
 > Is there any best practice to deal with such problem? Thanks for your 
 > help.
 >
 > Best Regards,
 > Tony Wei
 
 
 
>>> 
>> 
>> 
> 



Re: Cannot deploy Flink on YARN

2017-09-27 Thread Aljoscha Krettek
Since you're running in a container, the question is whether the container 
where the JM is running can access the ZooKeeper at 10.200.0.6.

> On 27. Sep 2017, at 04:31, Sridhar Chellappa  wrote:
> 
> Emily,
> 
> I did not get  chance to capture the logs on the container. Since I have 
> erased the instances, I have lost access to the logs. I have moved to no-ha 
> mode (single master) and running OK.
> 
> Aljoscha,
> 
> Network connectivity is good. I am able to ssh to 10.200.0.6. 
> 
> 
> Will try the HA mode and capture all the logs and send them over
> 
> On Tue, Sep 26, 2017 at 6:37 PM, Aljoscha Krettek  > wrote:
> Is the IP 10.200.0.6 reachable form the machine that runs the JobManager?
> 
>> On 25. Sep 2017, at 19:58, Emily McMahon > > wrote:
>> 
>> What's in the container log for the container that failed? 
>> 
>> On Sep 11, 2017 2:17 AM, "Sridhar Chellappa" > > wrote:
>> I am trying to start Flink(Version 1.3.0) on YARN (Hadoop 2.8.1) by issuing 
>> the following command:
>> 
>> ~/flink-1.3.0/bin/yarn-session.sh -s 4 -n 10 -jm 4096 -tm 4096-d
>> 
>> I am seeing a flurry of these Errors:
>> 
>> 2017-09-11 08:17:11,410 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>>- Deployment took more than 60 seconds. Please check if the 
>> requested resources are available in the YARN cluster
>> 2017-09-11 08:17:11,661 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>>- Deployment took more than 60 seconds. Please check if the 
>> requested resources are available in the YARN cluster
>> 2017-09-11 08:17:11,912 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>>- Deployment took more than 60 seconds. Please check if the 
>> requested resources are available in the YARN cluster
>> 2017-09-11 08:17:12,163 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>>- Deployment took more than 60 seconds. Please check if the 
>> requested resources are available in the YARN cluster
>> 
>> 
>> And then, my deployment fails with the following exception :
>> 
>> Error while deploying YARN cluster: Couldn't deploy Yarn cluster
>> java.lang.RuntimeException: Couldn't deploy Yarn cluster
>> at 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:439)
>> at 
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:630)
>> at 
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:486)
>> at 
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:483)
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.do 
>> As(Subject.java:422)
>> at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
>> at 
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:483)
>> Caused by: 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: 
>> The YARN application unexpectedly switched to state FAILED during deployment.
>> Diagnostics from YARN: Application application_1504851547322_0003 failed 2 
>> times due to AM Container for appattempt_1504851547322_0003_02 exited 
>> with  exitCode: 31
>> Failing this attempt.Diagnostics: Exception from container-launch.
>> Container id: container_1504851547322_0003_02_01
>> Exit code: 31
>> Stack trace: ExitCodeException exitCode=31:
>> at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
>> at org.apache.hadoop.util.Shell.run(Shell.java:869)
>> at 
>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
>> at 
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:236)
>> at 
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:305)
>> at 
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:84)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:748)
>> 
>> 
>> 
>> Further Debugging at the JobManager logs shows :
>> 
>> Resetting connection and trying again with a new connection.
>> 2017-09-11 08:17:11,820 INFO  org.apache.zookeeper.ZooKeeper 
>>- Initiating client connection, 
>> connectString=hig

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-27 Thread Stefan Richter
Hi,

thanks for the information. Unfortunately, I have no immediate idea what the 
reason is from the given information. I think most helpful could be a thread 
dump, but also metrics on the operator operator level to figure out which part 
of the pipeline is the culprit.

Best,
Stefan

> Am 26.09.2017 um 17:55 schrieb Tony Wei :
> 
> Hi Stefan,
> 
> There is no unknown exception in my full log. The Flink version is 1.3.2.
> My job is roughly like this.
> 
> env.addSource(Kafka)
>   .map(ParseKeyFromRecord)
>   .keyBy()
>   .process(CountAndTimeoutWindow)
>   .asyncIO(UploadToS3)
>   .addSink(UpdateDatabase)
> 
> It seemed all tasks stopped like the picture I sent in the last email.
> 
> I will keep my eye on taking a thread dump from that JVM if this happens 
> again.
> 
> Best Regards,
> Tony Wei
> 
> 2017-09-26 23:46 GMT+08:00 Stefan Richter  >:
> Hi,
> 
> that is very strange indeed. I had a look at the logs and there is no error 
> or exception reported. I assume there is also no exception in your full logs? 
> Which version of flink are you using and what operators were running in the 
> task that stopped? If this happens again, would it be possible to take a 
> thread dump from that JVM?
> 
> Best,
> Stefan
> 
> > Am 26.09.2017 um 17:08 schrieb Tony Wei  > >:
> >
> > Hi,
> >
> > Something weird happened on my streaming job.
> >
> > I found my streaming job seems to be blocked for a long time and I saw the 
> > situation like the picture below. (chk #1245 and #1246 were all finishing 
> > 7/8 tasks then marked timeout by JM. Other checkpoints failed with the same 
> > state like #1247 util I restarted TM.)
> >
> > 
> >
> > I'm not sure what happened, but the consumer stopped fetching records, 
> > buffer usage is 100% and the following task did not seem to fetch data 
> > anymore. Just like the whole TM was stopped.
> >
> > However, after I restarted TM and force the job restarting from the latest 
> > completed checkpoint, everything worked again. And I don't know how to 
> > reproduce it.
> >
> > The attachment is my TM log. Because there are many user logs and sensitive 
> > information, I only remain the log from `org.apache.flink...`.
> >
> > My cluster setting is one JM and one TM with 4 available slots.
> >
> > Streaming job uses all slots, checkpoint interval is 5 mins and max 
> > concurrent number is 3.
> >
> > Please let me know if it needs more information to find out what happened 
> > on my streaming job. Thanks for your help.
> >
> > Best Regards,
> > Tony Wei
> > 
> 
> 



Re: Exception in BucketingSink when cancelling Flink job

2017-09-27 Thread Stefan Richter
Hi,

I would speculate that the reason for this order is that we want to shutdown 
the tasks quickly by interrupting blocking calls in the event of failure, so 
that recover can begin as fast as possible. I am looping in Stephan who might 
give more details about this code.

Best,
Stefan 

> Am 27.09.2017 um 07:33 schrieb wangsan :
> 
> After digging into the source code, we found that when Flink job is canceled, 
> a TaskCanceler thread is created.
> 
> The TaskCanceler thread calls cancel() on the invokable and periodically 
> interrupts the
> task thread until it has terminated.
> 
> try {
>   invokable.cancel();
> } catch (Throwable t) {
>   logger.error("Error while canceling the task {}.", taskName, t);
> }//..executer.interrupt();try {
>   executer.join(interruptInterval);
> }catch (InterruptedException e) {  // we can ignore this}//..
> Notice that TaskCanceler first send interrupt signal to task thread, and 
> following with join method. And since the task thread is now try to close 
> DFSOutputStream, which is waiting for ack, thus InterruptedException is 
> throwed out in task thread.
> 
> synchronized (dataQueue) {while (!streamerClosed) {
>   checkClosed();  if (lastAckedSeqno >= seqno) {break;
>   }  try {
> dataQueue.wait(1000); // when we receive an ack, we notify on
> // dataQueue
>   } catch (InterruptedException ie) {throw new InterruptedIOException(
> "Interrupted while waiting for data to be acknowledged by pipeline");
>   }
> }
> I was confused why TaskCanceler call executer.interrupt() before 
> executer.join(interruptInterval). Can anyone help?
> 
> 
> 
> 
> 
> 
> Hi,
> 
> We are currently using BucketingSink to save data into HDFS in parquet 
> format. But when the flink job was cancelled, we always got Exception in 
> BucketingSink's  close method. The datailed exception info is as below:
> [ERROR] [2017-09-26 20:51:58,893] 
> [org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal 
> of stream operator.
> java.io.InterruptedIOException: Interrupted while waiting for data to be 
> acknowledged by pipeline
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
>   at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>   at 
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>   at 
> org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
>   at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
> ...
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> 
> It seems that DFSOutputStream haven't been closed before task thread is force 
> terminated. We found a similar problem in 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html
>  
> 
>  , but setting "akka.ask.timeout" to a larger value does not work for us. So 
> how can we make sure the stream is safely closed when cacelling a job?
> 
> Best,
> wangsan
> 
> 
> 
> 
> 



Re: Building scala examples

2017-09-27 Thread Nico Kruber
Hi Michael,
yes, it seems that the self-contained jars only contain the Java examples.

You may also follow the quickstart [1] to get started writing Flink streaming 
programs in Scala.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/
scala_api_quickstart.html

On Wednesday, 27 September 2017 05:04:31 CEST Michael Fong wrote:
> Thanks, Nico.
> 
> 
> I look again at flink-examples- streaming_2.10-1.4-SNAPSHOT.jar, and it
> indeed contains both.
> 
> Originally I was looking at each self-contained jars as I used them as
> examples to create and run my own streaming program. They only contain java
> compiled class, if I am not mistaken.
> 
> Let me try to create a scala example with similar build procedure.
> 
> Thanks!
> 
> 
> On Mon, Sep 25, 2017 at 10:41 PM, Nico Kruber 
> 
> wrote:
> > Hi Michael,
> > from what I see, Java and Scala examples reside in different packages,
> > e.g.
> > * org.apache.flink.streaming.scala.examples.async.AsyncIOExample vs.
> > * org.apache.flink.streaming.examples.async.AsyncIOExample
> > 
> > A quick run on the Flink 1.3. branch revealed flink-examples-
> > streaming_2.10-1.3-SNAPSHOT.jar containing both (which you can verify with
> > your favorite archiver tool for zip files).
> > 
> > Afaik, there is no simple switch to turn off Java or Scala examples. You
> > may
> > either adapt the pom.xml or create your own Project with the examples and
> > programming languages you need.
> > 
> > 
> > Nico
> > 
> > On Saturday, 23 September 2017 12:45:04 CEST Michael Fong wrote:
> > > Hi,
> > > 
> > > I am studying how to build a scala program from flink-examples/.
> > > 
> > > I can see there are two source folders java/ and scala/ from IntelliJ,
> > 
> > and
> > 
> > > for most examples, there is a copy of examples for Java and Scala.
> > > Executing 'mvn clean package -Pbuild-jar' would rests in a jar file
> > > under
> > > target/. I am wondering if that is a Java or Scala example that I just
> > > compiled? In addition, is there a way to selectively choose Java o Scala
> > > example to build with current maven settings?
> > > 
> > > Thanks in advance,




Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-27 Thread Tony Wei
Hi Stefan,

It seems that I found something strange from JM's log.

It had happened more than once before, but all subtasks would finish their
checkpoint attempts in the end.

2017-09-26 01:23:28,690 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1140 @ 1506389008690
2017-09-26 01:28:28,690 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1141 @ 1506389308690
2017-09-26 01:33:28,690 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1142 @ 1506389608690
2017-09-26 01:33:28,691 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1140
expired before completing.
2017-09-26 01:38:28,691 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1141
expired before completing.
2017-09-26 01:40:38,044 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late
message for now expired checkpoint attempt 1140 from
c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
2017-09-26 01:40:53,743 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late
message for now expired checkpoint attempt 1141 from
c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
2017-09-26 01:41:19,332 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 1142 (136733704 bytes in 457413 ms).

For chk #1245 and #1246, there was no late message from TM. You can refer
to the TM log. The full completed checkpoint attempt will have 12
(... asynchronous part) logs in general, but #1245 and #1246 only got 10
logs.

2017-09-26 10:08:28,690 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1245 @ 1506420508690
2017-09-26 10:13:28,690 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1246 @ 1506420808690
2017-09-26 10:18:28,691 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1245
expired before completing.
2017-09-26 10:23:28,691 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1246
expired before completing.

Moreover, I listed the directory for checkpoints on S3 and saw there were
two states not discarded successfully. In general, there will be 16 parts
for a completed checkpoint state.

2017-09-26 18:08:33 36919
tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf0b-11cc1fc67ab8
2017-09-26 18:13:34 37419
tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-8509-5fea4ed25af6

Hope these informations are helpful. Thank you.

Best Regards,
Tony Wei

2017-09-27 16:14 GMT+08:00 Stefan Richter :

> Hi,
>
> thanks for the information. Unfortunately, I have no immediate idea what
> the reason is from the given information. I think most helpful could be a
> thread dump, but also metrics on the operator operator level to figure out
> which part of the pipeline is the culprit.
>
> Best,
> Stefan
>
> Am 26.09.2017 um 17:55 schrieb Tony Wei :
>
> Hi Stefan,
>
> There is no unknown exception in my full log. The Flink version is 1.3.2.
> My job is roughly like this.
>
> env.addSource(Kafka)
>   .map(ParseKeyFromRecord)
>   .keyBy()
>   .process(CountAndTimeoutWindow)
>   .asyncIO(UploadToS3)
>   .addSink(UpdateDatabase)
>
> It seemed all tasks stopped like the picture I sent in the last email.
>
> I will keep my eye on taking a thread dump from that JVM if this happens
> again.
>
> Best Regards,
> Tony Wei
>
> 2017-09-26 23:46 GMT+08:00 Stefan Richter :
>
>> Hi,
>>
>> that is very strange indeed. I had a look at the logs and there is no
>> error or exception reported. I assume there is also no exception in your
>> full logs? Which version of flink are you using and what operators were
>> running in the task that stopped? If this happens again, would it be
>> possible to take a thread dump from that JVM?
>>
>> Best,
>> Stefan
>>
>> > Am 26.09.2017 um 17:08 schrieb Tony Wei :
>> >
>> > Hi,
>> >
>> > Something weird happened on my streaming job.
>> >
>> > I found my streaming job seems to be blocked for a long time and I saw
>> the situation like the picture below. (chk #1245 and #1246 were all
>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
>> with the same state like #1247 util I restarted TM.)
>> >
>> > 
>> >
>> > I'm not sure what happened, but the consumer stopped fetching records,
>> buffer usage is 100% and the following task did not seem to fetch data
>> anymore. Just like the whole TM was stopped.
>> >
>> > However, after I restarted TM and force the job restarting from the
>> latest completed checkpoint, everything worked again. And I don't know how
>> to reproduce it.
>> >
>> > The attachment is my TM log. Because there are many user logs and
>> sensitive information, I only remain the log from `org.apache.flink...`.
>> >
>> > My cluster setting is one JM and one TM with 4 available slots.
>> >
>> > Streaming

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-27 Thread Stefan Richter
Hi Tony,

are your checkpoints typically close to the timeout boundary? From what I see, 
writing the checkpoint is relatively fast but the time from the checkpoint 
trigger to execution seems very long. This is typically the case if your job 
has a lot of backpressure and therefore the checkpoint barriers take a long 
time to travel to the operators, because a lot of events are piling up in the 
buffers. Do you also experience large alignments for your checkpoints?

Best,
Stefan  

> Am 27.09.2017 um 10:43 schrieb Tony Wei :
> 
> Hi Stefan,
> 
> It seems that I found something strange from JM's log.
> 
> It had happened more than once before, but all subtasks would finish their 
> checkpoint attempts in the end.
> 
> 2017-09-26 01:23:28,690 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1140 @ 1506389008690
> 2017-09-26 01:28:28,690 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1141 @ 1506389308690
> 2017-09-26 01:33:28,690 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1142 @ 1506389608690
> 2017-09-26 01:33:28,691 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1140 
> expired before completing.
> 2017-09-26 01:38:28,691 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1141 
> expired before completing.
> 2017-09-26 01:40:38,044 WARN 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
> message for now expired checkpoint attempt 1140 from 
> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
> 2017-09-26 01:40:53,743 WARN 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
> message for now expired checkpoint attempt 1141 from 
> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
> 2017-09-26 01:41:19,332 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 1142 (136733704 bytes in 457413 ms).
> 
> For chk #1245 and #1246, there was no late message from TM. You can refer to 
> the TM log. The full completed checkpoint attempt will have 12 (... 
> asynchronous part) logs in general, but #1245 and #1246 only got 10 logs.
> 
> 2017-09-26 10:08:28,690 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1245 @ 1506420508690
> 2017-09-26 10:13:28,690 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1246 @ 1506420808690
> 2017-09-26 10:18:28,691 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1245 
> expired before completing.
> 2017-09-26 10:23:28,691 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1246 
> expired before completing.
> 
> Moreover, I listed the directory for checkpoints on S3 and saw there were two 
> states not discarded successfully. In general, there will be 16 parts for a 
> completed checkpoint state.
> 
> 2017-09-26 18:08:33 36919 
> tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf0b-11cc1fc67ab8
> 2017-09-26 18:13:34 37419 
> tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-8509-5fea4ed25af6
> 
> Hope these informations are helpful. Thank you.
> 
> Best Regards,
> Tony Wei
> 
> 2017-09-27 16:14 GMT+08:00 Stefan Richter  >:
> Hi,
> 
> thanks for the information. Unfortunately, I have no immediate idea what the 
> reason is from the given information. I think most helpful could be a thread 
> dump, but also metrics on the operator operator level to figure out which 
> part of the pipeline is the culprit.
> 
> Best,
> Stefan
> 
>> Am 26.09.2017 um 17:55 schrieb Tony Wei > >:
>> 
>> Hi Stefan,
>> 
>> There is no unknown exception in my full log. The Flink version is 1.3.2.
>> My job is roughly like this.
>> 
>> env.addSource(Kafka)
>>   .map(ParseKeyFromRecord)
>>   .keyBy()
>>   .process(CountAndTimeoutWindow)
>>   .asyncIO(UploadToS3)
>>   .addSink(UpdateDatabase)
>> 
>> It seemed all tasks stopped like the picture I sent in the last email.
>> 
>> I will keep my eye on taking a thread dump from that JVM if this happens 
>> again.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2017-09-26 23:46 GMT+08:00 Stefan Richter > >:
>> Hi,
>> 
>> that is very strange indeed. I had a look at the logs and there is no error 
>> or exception reported. I assume there is also no exception in your full 
>> logs? Which version of flink are you using and what operators were running 
>> in the task that stopped? If this happens again, would it be possible to 
>> take a thread dump from that JVM?
>> 
>> Best,
>> Stefan
>> 
>> > Am 26.09.2017 um 17:08 schrieb Tony Wei > > >:
>> >
>> > Hi,
>> >
>> > Something weird happened on my streaming job.
>> >
>> > I found my streaming

Re: StreamCorruptedException

2017-09-27 Thread Kostas Kloudas
Hi Sridhar,

From looking at your code:

1) The “KafkaDataSource” is a custom source that you implemented? Does this 
source buffer anything?
2) The getStreamSource2 seems to return again a "new 
KafkaDataSource”. Can this be a problem?
3) You are working on processing time and you are simply detecting if 2 
messages of the same type came within 15min right? 
I suppose that this could also be implemented using the times() 
quantifier, but this is just a matter of taste.
Could you reduce this to a smaller duration and see if you still get a 
corrupted stream exception?

Thanks,
Kostas

> On Sep 27, 2017, at 5:42 AM, Sridhar Chellappa  wrote:
> 
> One more point to add.
> 
> I disabled checkpoints (by commenting out code that calls 
> enableCheckpointing()) and re-ran the job this time with plenty of memory to 
> the job manager
> 
> ~/flink-1.3.2/bin/yarn-session.sh -n 4 -jm 24576 -tm 24576 -s 2 -d
> 
> At the Jobmanager, I am still hitting:
> 
> 2017-09-25 06:46:44,066 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  Starting 
> YARN ApplicationMaster / ResourceManager / JobManager (Version: 1.3.2, 
> Rev:0399bee, Date:03.08.2017 @ 10:23:11 UTC)
> 2017-09-25 06:46:44,066 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  Current 
> user: flink
> 2017-09-25 06:46:44,066 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  JVM: OpenJDK 
> 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
> 2017-09-25 06:46:44,066 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  Maximum heap 
> size: 16384 MiBytes
> 2017-09-25 06:46:44,066 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  JAVA_HOME: 
> /usr/lib/jvm/java-8-openjdk-amd64
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  Hadoop 
> version: 2.7.2
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  JVM Options:
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner - -Xmx18432m
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner - 
> -Dlog.file=/var/log/hadoop-yarn/userlogs/application_1506317793012_0001/container_1506317793012_0001_01_01/jobmanager.log
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner - 
> -Dlogback.configurationFile=file:logback.xml
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner - 
> -Dlog4j.configuration=file:log4j.properties
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  Program 
> Arguments: (none)
> 
>   
>.
>   
>.
> 
> 2017-09-25 06:50:51,925 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map 
> -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) 
> switched from DEPLOYING to RUNNING.
> 2017-09-25 13:38:54,175 INFO  org.apache.flink.runtime.blob.BlobCache 
>   - Created BLOB cache storage directory 
> /tmp/blobStore-3e0b96a1-904b-4acb-b0d3-9d88f2073e97
> 2017-09-25 13:38:54,187 INFO  org.apache.flink.runtime.blob.BlobCache 
>   - Downloading 49efe0ad58b727ba145b86df6088111c9a90ddd6 from 
> localhost/127.0.0.1:0 
> 2017-09-25 16:30:39,974 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- 
> KeyedCEPPatternOperator -> Map (2/2) (e464ec796cd239a7b7fa225aaf86309a) 
> switched from RUNNING to CANCELED.
> 2017-09-25 16:30:39,975 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map 
> -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) 
> switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown 
> Source)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at 
> com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
> at 
> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624)
> 

Re: got Warn message - "the expected leader session ID did not equal the received leader session ID " when using LocalFlinkMiniCluster to interpret scala code

2017-09-27 Thread XiangWei Huang
Hi Till,

I’ve found that a StandaloneMiniCluster doesn’t startup web fronted 
when it is running.so,how can i cancel a running job on it with restful method.

Cheers,
Till

> 在 2017年9月20日,15:43,Till Rohrmann  写道:
> 
> Hi XiangWei,
> 
> programmatically there is no nice tooling yet to cancel jobs on a dedicated 
> cluster. What you can do is to use Flink's REST API to issue a cancel command 
> [1]. You have to send a GET request to the target URL `/jobs/:jobid/cancel`. 
> In the future we will improve the programmatic job control which will allow 
> you to do these kind of things more easily.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation
>  
> 
> 
> Cheers,
> Till
> 
> On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang  > wrote:
> Hi Till,
>
>  Thanks for your answer,it worked when i use StandaloneMiniCluster,but 
> another problem is that i can’t find a way to cancel
> a running Flink job without shutting down the cluster,for 
> LocalFlinkMiniCluster i can do  it with below code :
> 
>for (job <- cluster.getCurrentlyRunningJobsJava()) {
>   cluster.stopJob(job)
>}
> 
>Is it possible to cancel a running Flink job without shutting down a 
> StandaloneMiniCluster ?
> 
> Best Regards,
> XiangWei
> 
> 
> 
>> 在 2017年9月14日,16:58,Till Rohrmann > > 写道:
>> 
>> Hi XiangWei,
>> 
>> the problem is that the LocalFlinkMiniCluster can no longer be used in 
>> combination with a RemoteExecutionEnvironment. The reason is that the 
>> LocalFlinkMiniCluster uses now an internal leader election service and 
>> assigns leader ids to its components. Since this is an internal service it 
>> is not possible to retrieve this information like it is the case with the 
>> ZooKeeper based leader election services.
>> 
>> Long story short, the Flink Scala shell currently does not work with a 
>> LocalFlinkMiniCluster and would have to be fixed to work properly together 
>> with a local execution environment. Until then, I recommend starting a local 
>> standalone cluster and let the code run there.
>> 
>> Cheers,
>> Till
>> 
>> 
>> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang > > wrote:
>> dear all,
>> 
>> Below is the code i execute:
>> 
>> import java.io ._
>> import java.net .{URL, URLClassLoader}
>> import java.nio.charset.Charset
>> import java.util.Collections
>> import java.util.concurrent.at 
>> omic.AtomicBoolean
>> 
>> import com.netease.atom.common.util.logging.Logging
>> import com.netease.atom.interpreter.Code.Code
>> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, 
>> InterpreterUtils}
>> import io.netty.buffer._
>> import org.apache.flink.api.sc 
>> ala.FlinkILoop
>> import org.apache.flink.client.CliFrontend
>> import org.apache.flink.client.cli.CliFrontendParser
>> import org.apache.flink.client.program.ClusterClient
>> import org.apache.flink.configuration.{QueryableStateOptions, Configuration, 
>> ConfigConstants, GlobalConfiguration}
>> import org.apache.flink.runtime.akka.AkkaUtils
>> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, 
>> LocalFlinkMiniCluster}
>> 
>> import scala.Console
>> import scala.beans.BeanProperty
>> import scala.collection.JavaConversions._
>> import scala.collection.mutable
>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>> import scala.runtime.AbstractFunction0
>> import scala.tools.nsc.Settings
>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>> 
>> class FlinkInterpreter extends Interpreter {
>>   private var bufferedReader: Option[BufferedReader] = None
>>   private var jprintWriter: JPrintWriter = _
>>   private val config = new Configuration;
>>   private var cluster: LocalFlinkMiniCluster = _
>>   @BeanProperty var imain: IMain = _
>>   @BeanProperty var flinkILoop: FlinkILoop = _
>>   private var out: ByteBufOutputStream = null
>>   private var outBuf: ByteBuf = null
>>   private var in: ByteBufInputStream = _
>>   private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>> 
>>   override def isOpen: Boolean = {
>> isRunning.get()
>>   }
>> 
>>   def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>> config.toMap.toMap.foreach(println)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
>> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
>> val localClust

Re: Using latency markers

2017-09-27 Thread Martin Eden
Any follow-up on this? Jira? PR?

On Wed, Sep 13, 2017 at 11:30 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Aitozi,
>
> Yes, I think we haven’t really pin-pointed out the actual cause of the
> problem, but if you have a fix for that and can provide a PR we can
> definitely look at it! That would be helpful.
> Before opening a PR, also make sure to first open a JIRA for the issue (I
> don’t think there is one yet for this issue).
>
> Cheers,
> Gordon
>
> On 13 September 2017 at 12:14:42 PM, aitozi (gjying1...@gmail.com) wrote:
>
> Hi, Aljoscha,
>
> the dashboard shown NAN is just because the value of the latencyGague is
> not
> numerical, so it can't be shown in dashboard, i removed the other
> latencydescprition except the sink, so i can see the latency in dashboard,
> do i need to post a pr?
>
> thanks,
> Aitozi
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>


Re: Flink Application Jar file on Docker container

2017-09-27 Thread Rahul Raj
Hi Stefan,

I have a question in my mid out of curiosity Is it possible to run
flink application within docker container by using flink cluster set up on
host?

Rahul Raj

On 26 September 2017 at 17:29, Stefan Richter 
wrote:

> Hi,
>
> if I correctly understood the approach outlined on github, you can start a
> standalone job manager and the task manager get the JM information either
> through the provided configuration or through Zookeeper. Take a look at the
> „running section“, e.g.:
>
> 1) „Via Mesos/Marathon: Start a standalone JobManager (you need to replace
> the flink_recovery_zookeeper_quorum variable with a valid setting for
> your cluster) [...]“
> 2) „Via standalone Docker: Start a standalone JobManager (with host
> networking, binding on 127.0.0.1) […]“
>
> Best,
> Stefan
>
>
> Am 26.09.2017 um 12:43 schrieb Rahul Raj :
>
> Hi Stefan,
>
> Thanks a lot for your answer and sharing the link
> https://github.com/mesoshq/flink. I went through this and saw its
> spawning Jobmanager and taskmanager. Now I think, this should be happening.
> First JobManager will be started on flink cluster on one node, then task
> manager will be started on another node and both should be running in
> docker containers on different nodes. Now, my question is how flink's
> JobManager will get to know about the taskManagers as they are in in
> different docker containers on different nodes? Will it happen via Mesos?
>
> Can we use mesos-appmaster.sh instead which is already built in flink for
> deployment on mesos?
>
> Rahul Raj
>
> On 26 September 2017 at 15:32, Stefan Richter  > wrote:
>
>> Hi,
>>
>> as in my answer to your previous mail, I suggest to take a look at
>> https://github.com/mesoshq/flink . Unfortunately, there is not yet a lot
>> documentation about the internals of how this works, so I am also looping
>> in Till who might know more about specific questions about how things work
>> together exactly.
>>
>> Best,
>> Stefan
>>
>>
>> Am 26.09.2017 um 09:21 schrieb Rahul Raj :
>>
>> Currently I have a Flink Application Jar file running on Mesos cluster.
>> The flink application simply reads data from Kafka and put it to HDFS.
>>
>> Now we are planning to create a docker image to  run this application jar
>> file inside docker containers on Mesos cluster via Marathon.
>>
>> Below are the questions that I am looking answers for:
>>
>> 1. While building the docker image, how do I include flink-1.3.2 set up
>> and  my mesos config in flink?
>>
>> 2. How shall I run my existing flink application jar?
>>
>> 3. Will running my flink application jar on docker containers will run it
>> on mesos slaves on different docker containers? How docker , Flink, mesos ,
>> Marathon will work together in my case?
>>
>> Rahul Raj
>>
>>
>>
>
>


Re: Add custom configuration files to TMs classpath on YARN

2017-09-27 Thread Mikhail Pryakhin
Hi Nico,

Thanks a lot for you help, but unfortunately, the workaround you suggested 
doesn't work for me.
I tried to leverage the StreamExecutionEnvironment#registerCachedFile method 
but failed because this instance is created when the application master has 
already been started therefore the classpath to run the application somewhere 
on YARN cluster has already been created by means of 
org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to pass 
a local folder at the moment I submit the application so that it is included in 
the application YARN classpath.
The option you suggested works well if I need to cache a file that is available 
for me at the moment I want to register it (for example a file on HDFS).

Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to pass 
user-specified folders to the YARN application classpath?



Kind Regards,
Mike Pryakhin



> On 21 Jun 2017, at 16:55, Mikhail Pryakhin  wrote:
> 
> Hi Nico!
> Sounds great, will give it a try and return back with results soon.
> 
> Thank you so much for your help!!
> 
> Kind Regards,
> Mike Pryakhin
> 
>> On 21 Jun 2017, at 16:36, Nico Kruber > > wrote:
>> 
>> A workaround may be to use the DistributedCache. It apparently is not 
>> documented much but the JavaDoc mentions roughly how to use it:
>> 
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
>>  
>> 
>> flink/api/java/ExecutionEnvironment.java#L954
>> 
>> /**
>> * Registers a file at the distributed cache under the given name. The file 
>> will 
>> be accessible
>> * from any user-defined function in the (distributed) runtime under a local 
>> path. Files
>> * may be local files (as long as all relevant workers have access to it), or 
>> files in a distributed file system.
>> * The runtime will copy the files temporarily to a local cache, if needed.
>> * 
>> * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be 
>> obtained inside UDFs via
>> * {@link 
>> org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and 
>> provides access
>> * {@link org.apache.flink.api.common.cache.DistributedCache} via 
>> * {@link 
>> org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
>> * 
>> * @param filePath The path of the file, as a URI (e.g. "file:///some/path" 
>> or 
>> "hdfs://host:port/and/path")
>> * @param name The name under which the file is registered.
>> */
>> public void registerCachedFile(String filePath, String name){
>>  registerCachedFile(filePath, name, false);
>> }
>> 
>> You could pass the actual file URL to use for each instance of your job that 
>> requires a different file via a simple job parameter:
>> 
>> 
>> public static void main(String[] args) throws Exception {
>>  ParameterTool params = ParameterTool.fromArgs(args);
>> 
>>  ...
>> 
>>  env.registerCachedFile(params.get("config_file", ), 
>> "extConfig");
>> 
>>  ...
>> }
>> 
>> Flink's DistributedCache will then cache the file locally and you can use it 
>> in 
>> a RichFunction like in
>> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/
>> apache/flink/test/distributedCache/DistributedCacheTest.java#L99
>> 
>> public class MyFunction extends AbstractRichFunction {
>>  private static final long serialVersionUID = 1L;
>> 
>>  @Override
>>  public void open(Configuration conf) throws IOException {
>>  File file = 
>> getRuntimeContext().getDistributedCache().getFile("extConfig");
>> ...
>>  }
>> }
>> 
>> 
>> Nico
>> 
>> On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:
>>> Hi guys,
>>> 
>>> any news?
>>> I’ve created a jira-ticket https://issues.apache.org/jira/browse/FLINK-6949
>>> .
>>> 
>>> 
>>> Kind Regards,
>>> Mike Pryakhin
>>> 
 On 16 Jun 2017, at 16:35, Mikhail Pryakhin  wrote:
 
 Hi all,
 
 I run my flink job on yarn cluster and need to supply job configuration
 parameters via configuration file alongside with the job jar.
 (configuration file can't be packaged into jobs jar file). I tried to put
 the configuration file into the folder that is passed via --yarnship
 option to the flink run command, then this file is copied to the yarn
 cluster and added to JVM class path like 'path/application.conf' but is
 ignored by TM JVM as it is neither jar(zip) file nor directory...
 
 A looked through the YarnClusterDescriptor class where the
 ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
 flink (YarnClusterDescriptor especially) to add my configuration file to
 the TM JVM classpath... Is there any way to do so? If not do you consider
 to have such an ability to add files? (like in spark I just can pass any
 files via --files option)
 
 

CustomPartitioner that simulates ForwardPartitioner and watermarks

2017-09-27 Thread Yunus Olgun
Hi,

I have a simple streaming job such as:

source.process(taskA)
   .process(taskB)

I want taskB to access minimum watermark of all parallel taskA instances, but 
the data is ordered and should not be shuffled. ForwardPartitioner uses 
watermark of only one predecessor. So, I have used a customPartitioner.

source.process(taskA)
   .map(AssignPartitionID)
   .partitionCustom(IdPartitioner)
   .map(StripPartitionID)
   .process(taskB)

At AssignPartitionID function, I attach 
getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. At 
IdPartitioner, I return this partitionId.

This solved the main requirement but I have another concern now,

Network shuffle: I don’t need a network shuffle. I thought within a 
taskmanager, indexId of taskA subtasks would be same as indexId of taskB 
subtasks. Unfortunately, they are not. Is there a way to make partitionCustom 
distribute data like ForwardPartitioner, to the next local operator? 

As I know, this still requires object serialization/deserialization since 
operators can’t be chained anymore. Is there a way to get minimum watermark 
from upstream operators without network shuffle and object 
serilization/deserialization?

Regards,

Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

2017-09-27 Thread Kostas Kloudas
Hi Yunus,

I am not sure if I understand correctly the question.

Am I correct to assume that you want the following?

———> time

ProcessAProcessB

Task1: W(3) E(1) E(2) E(5)  W(3) W(7) E(1) E(2) E(5)

Task2: W(7) E(3) E(10) E(6) W(3) W(7) E(3) E(10) E(6)


In the above, elements flow from left to right and W() stands for watermark and 
E() stands for element.
In other words, between Process(TaksA) and Process(TaskB) you want to only 
forward the elements, but broadcast the watermarks, right?

If this is the case, a trivial solution would be to set the parallelism of 
TaskB to 1, so that all elements go through the same node.

One other solution is what you did, BUT by using a custom partitioner you 
cannot use keyed state in your process function B because the 
stream is no longer keyed.

A similar approach to what you did but without the limitation above, is that in 
the first processFunction (TaskA) you can append the 
taskId to the elements themselves and then do a keyBy(taskId) between the first 
and the second process function.

These are the solutions that I can come up with, assuming that you want to do 
what I described.

But in general, could you please describe a bit more what is your use case? 
This way we may figure out another approach to achieve your goal. 
In fact, I am not sure if you earn anything by broadcasting the watermark, 
other than 
re-implementing (to some extent) Flink’s windowing mechanism.

Thanks,
Kostas

> On Sep 27, 2017, at 4:35 PM, Yunus Olgun  wrote:
> 
> Hi,
> 
> I have a simple streaming job such as:
> 
> source.process(taskA)
>   .process(taskB)
> 
> I want taskB to access minimum watermark of all parallel taskA instances, but 
> the data is ordered and should not be shuffled. ForwardPartitioner uses 
> watermark of only one predecessor. So, I have used a customPartitioner.
> 
> source.process(taskA)
>   .map(AssignPartitionID)
>   .partitionCustom(IdPartitioner)
>   .map(StripPartitionID)
>   .process(taskB)
> 
> At AssignPartitionID function, I attach 
> getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. 
> At IdPartitioner, I return this partitionId.
> 
> This solved the main requirement but I have another concern now,
> 
> Network shuffle: I don’t need a network shuffle. I thought within a 
> taskmanager, indexId of taskA subtasks would be same as indexId of taskB 
> subtasks. Unfortunately, they are not. Is there a way to make partitionCustom 
> distribute data like ForwardPartitioner, to the next local operator? 
> 
> As I know, this still requires object serialization/deserialization since 
> operators can’t be chained anymore. Is there a way to get minimum watermark 
> from upstream operators without network shuffle and object 
> serilization/deserialization?
> 
> Regards,



Re: Add custom configuration files to TMs classpath on YARN

2017-09-27 Thread Robert Metzger
Hi Mike,

For using the DistributedCache approach, you need to have HDFS or another
distributed FS available to distribute the files.

I would actually like to understand why you said " then this file is copied
to the yarn cluster and added to JVM class  [...] but is ignored by TM JVM
as it is neither jar(zip) file nor directory..."
I believe you should be able to load non-class files through the
classloader as well.
Did you see any code that excludes non-class files? Afaik the Taskmanagers
have access to all files (of any type) that are passed using the --ship
command (or in the lib/ folder).


On Wed, Sep 27, 2017 at 3:42 PM, Mikhail Pryakhin 
wrote:

> Hi Nico,
>
> Thanks a lot for you help, but unfortunately, the workaround you suggested
> doesn't work for me.
> I tried to leverage the StreamExecutionEnvironment#registerCachedFile
> method but failed because this instance is created when the application
> master has already been started therefore the classpath to run the
> application somewhere on YARN cluster has already been created by means of
> org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to
> pass a local folder at the moment I submit the application so that it is
> included in the application YARN classpath.
> The option you suggested works well if I need to cache a file that is
> available for me at the moment I want to register it (for example a file on
> HDFS).
>
> Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to
> pass user-specified folders to the YARN application classpath?
>
>
>
> Kind Regards,
> Mike Pryakhin
>
>
>
> On 21 Jun 2017, at 16:55, Mikhail Pryakhin  wrote:
>
> Hi Nico!
> Sounds great, will give it a try and return back with results soon.
>
> Thank you so much for your help!!
>
> Kind Regards,
> Mike Pryakhin
>
> On 21 Jun 2017, at 16:36, Nico Kruber  wrote:
>
> A workaround may be to use the DistributedCache. It apparently is not
> documented much but the JavaDoc mentions roughly how to use it:
>
> https://github.com/apache/flink/blob/master/flink-java/
> src/main/java/org/apache/
> flink/api/java/ExecutionEnvironment.java#L954
>
> /**
> * Registers a file at the distributed cache under the given name. The file
> will
> be accessible
> * from any user-defined function in the (distributed) runtime under a
> local
> path. Files
> * may be local files (as long as all relevant workers have access to it),
> or
> files in a distributed file system.
> * The runtime will copy the files temporarily to a local cache, if needed.
> * 
> * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be
> obtained inside UDFs via
> * {@link
> org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()}
> and
> provides access
> * {@link org.apache.flink.api.common.cache.DistributedCache} via
> * {@link
> org.apache.flink.api.common.functions.RuntimeContext#
> getDistributedCache()}.
> *
> * @param filePath The path of the file, as a URI (e.g. "file:///some/path"
> or
> "hdfs://host:port/and/path")
> * @param name The name under which the file is registered.
> */
> public void registerCachedFile(String filePath, String name){
> registerCachedFile(filePath, name, false);
> }
>
> You could pass the actual file URL to use for each instance of your job
> that
> requires a different file via a simple job parameter:
>
>
> public static void main(String[] args) throws Exception {
> ParameterTool params = ParameterTool.fromArgs(args);
>
> ...
>
> env.registerCachedFile(params.get("config_file", ),
> "extConfig");
>
> ...
> }
>
> Flink's DistributedCache will then cache the file locally and you can use
> it in
> a RichFunction like in
> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/
> apache/flink/test/distributedCache/DistributedCacheTest.java#L99
>
> public class MyFunction extends AbstractRichFunction {
> private static final long serialVersionUID = 1L;
>
> @Override
> public void open(Configuration conf) throws IOException {
> File file =
> getRuntimeContext().getDistributedCache().getFile("extConfig");
> ...
> }
> }
>
>
> Nico
>
> On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:
>
> Hi guys,
>
> any news?
> I’ve created a jira-ticket https://issues.apache.org/
> jira/browse/FLINK-6949
> .
>
>
> Kind Regards,
> Mike Pryakhin
>
> On 16 Jun 2017, at 16:35, Mikhail Pryakhin  wrote:
>
> Hi all,
>
> I run my flink job on yarn cluster and need to supply job configuration
> parameters via configuration file alongside with the job jar.
> (configuration file can't be packaged into jobs jar file). I tried to put
> the configuration file into the folder that is passed via --yarnship
> option to the flink run command, then this file is copied to the yarn
> cluster and added to JVM class path like 'path/application.conf' but is
> ignored by TM JVM as it is neither jar(zip) file nor directory...
>
> A looked through the YarnClusterDescriptor class where

Re: Custom Serializers

2017-09-27 Thread nragon
Should I use TypeSerializerSingleton if it is independent of the object which
it's serializing?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Application Jar file on Docker container

2017-09-27 Thread Stefan Richter
Hi,

from the top of my head, I cannot see why this should not be possible, task 
managers just need to be able to connect to their job manager. Unfortunately, I 
cannot give a real guarantee here because I am not that deeply involved in this 
aspect of Flink.

Best,
Stefan

> Am 27.09.2017 um 12:39 schrieb Rahul Raj :
> 
> Hi Stefan,
> 
> I have a question in my mid out of curiosity Is it possible to run flink 
> application within docker container by using flink cluster set up on host?
> 
> Rahul Raj
> 
> On 26 September 2017 at 17:29, Stefan Richter  > wrote:
> Hi,
> 
> if I correctly understood the approach outlined on github, you can start a 
> standalone job manager and the task manager get the JM information either 
> through the provided configuration or through Zookeeper. Take a look at the 
> „running section“, e.g.:
> 
> 1) „Via Mesos/Marathon: Start a standalone JobManager (you need to replace 
> the flink_recovery_zookeeper_quorum variable with a valid setting for your 
> cluster) [...]“
> 2) „Via standalone Docker: Start a standalone JobManager (with host 
> networking, binding on 127.0.0.1) […]“
> 
> Best,
> Stefan
> 
> 
>> Am 26.09.2017 um 12:43 schrieb Rahul Raj > >:
>> 
>> Hi Stefan,
>> 
>> Thanks a lot for your answer and sharing the link 
>> https://github.com/mesoshq/flink . I went 
>> through this and saw its spawning Jobmanager and taskmanager. Now I think, 
>> this should be happening. First JobManager will be started on flink cluster 
>> on one node, then task manager will be started on another node and both 
>> should be running in docker containers on different nodes. Now, my question 
>> is how flink's JobManager will get to know about the taskManagers as they 
>> are in in different docker containers on different nodes? Will it happen via 
>> Mesos?
>> 
>> Can we use mesos-appmaster.sh instead which is already built in flink for 
>> deployment on mesos?
>> 
>> Rahul Raj  
>> 
>> On 26 September 2017 at 15:32, Stefan Richter > > wrote:
>> Hi,
>> 
>> as in my answer to your previous mail, I suggest to take a look at 
>> https://github.com/mesoshq/flink  . 
>> Unfortunately, there is not yet a lot documentation about the internals of 
>> how this works, so I am also looping in Till who might know more about 
>> specific questions about how things work together exactly.
>> 
>> Best,
>> Stefan
>>  
>>> Am 26.09.2017 um 09:21 schrieb Rahul Raj >> >:
>>> 
>>> Currently I have a Flink Application Jar file running on Mesos cluster. The 
>>> flink application simply reads data from Kafka and put it to HDFS.
>>> 
>>> Now we are planning to create a docker image to  run this application jar 
>>> file inside docker containers on Mesos cluster via Marathon. 
>>> 
>>> Below are the questions that I am looking answers for:
>>> 
>>> 1. While building the docker image, how do I include flink-1.3.2 set up and 
>>>  my mesos config in flink?
>>> 
>>> 2. How shall I run my existing flink application jar?
>>> 
>>> 3. Will running my flink application jar on docker containers will run it 
>>> on mesos slaves on different docker containers? How docker , Flink, mesos , 
>>> Marathon will work together in my case?
>>> 
>>> Rahul Raj
>> 
>> 
> 
> 



Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

2017-09-27 Thread Yunus Olgun
Hi Kostas,

Yes, you have summarized well. I want to only forward the data to the next 
local operator, but broadcast the watermark through the cluster.

- I can’t set parallelism of taskB to 1. The stream is too big for that. Also, 
the data is ordered at each partition. I don’t want to change that order.

- I don’t need KeyedStream. Also taskA and taskB will always have the same 
parallelism with each other. But this parallelism can be increased in the 
future.

The use case is: The source is Kafka. At our peak hours or when we want to run 
the streaming job with old data from Kafka, always the same thing happens. Even 
at trivial jobs. Some consumers consumes faster than others. They produce too 
much data to downstream but watermark advances slowly at the speed of the 
slowest consumer. This extra data gets piled up at downstream operators. When 
the downstream operator is an aggregation, it is ok. But when it is a in-Flink 
join; state size gets too big, checkpoints take much longer and overall the job 
becomes slower or fails. Also it effects other jobs at the cluster.

So, basically I want to implement a throttler. It compares timestamp of a 
record and the global watermark. If the difference is larger than a constant 
threshold it starts sleeping 1 ms for each incoming record. This way, fast 
operators wait for the slowest one.

The only problem is that, this solution came at the cost of one network shuffle 
and data serialization/deserialization. Since the stream is large I want to 
avoid the network shuffle at the least. 

I thought operator instances within a taskmanager would get the same indexId, 
but apparently this is not the case.

Thanks,

> On 27. Sep 2017, at 17:16, Kostas Kloudas  wrote:
> 
> Hi Yunus,
> 
> I am not sure if I understand correctly the question.
> 
> Am I correct to assume that you want the following?
> 
>   ———> time
> 
>   ProcessAProcessB
> 
> Task1: W(3) E(1) E(2) E(5)W(3) W(7) E(1) E(2) E(5)
> 
> Task2: W(7) E(3) E(10) E(6)   W(3) W(7) E(3) E(10) E(6)
> 
> 
> In the above, elements flow from left to right and W() stands for watermark 
> and E() stands for element.
> In other words, between Process(TaksA) and Process(TaskB) you want to only 
> forward the elements, but broadcast the watermarks, right?
> 
> If this is the case, a trivial solution would be to set the parallelism of 
> TaskB to 1, so that all elements go through the same node.
> 
> One other solution is what you did, BUT by using a custom partitioner you 
> cannot use keyed state in your process function B because the 
> stream is no longer keyed.
> 
> A similar approach to what you did but without the limitation above, is that 
> in the first processFunction (TaskA) you can append the 
> taskId to the elements themselves and then do a keyBy(taskId) between the 
> first and the second process function.
> 
> These are the solutions that I can come up with, assuming that you want to do 
> what I described.
> 
> But in general, could you please describe a bit more what is your use case? 
> This way we may figure out another approach to achieve your goal. 
> In fact, I am not sure if you earn anything by broadcasting the watermark, 
> other than 
> re-implementing (to some extent) Flink’s windowing mechanism.
> 
> Thanks,
> Kostas
> 
>> On Sep 27, 2017, at 4:35 PM, Yunus Olgun > > wrote:
>> 
>> Hi,
>> 
>> I have a simple streaming job such as:
>> 
>> source.process(taskA)
>>   .process(taskB)
>> 
>> I want taskB to access minimum watermark of all parallel taskA instances, 
>> but the data is ordered and should not be shuffled. ForwardPartitioner uses 
>> watermark of only one predecessor. So, I have used a customPartitioner.
>> 
>> source.process(taskA)
>>   .map(AssignPartitionID)
>>   .partitionCustom(IdPartitioner)
>>   .map(StripPartitionID)
>>   .process(taskB)
>> 
>> At AssignPartitionID function, I attach 
>> getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the object. 
>> At IdPartitioner, I return this partitionId.
>> 
>> This solved the main requirement but I have another concern now,
>> 
>> Network shuffle: I don’t need a network shuffle. I thought within a 
>> taskmanager, indexId of taskA subtasks would be same as indexId of taskB 
>> subtasks. Unfortunately, they are not. Is there a way to make 
>> partitionCustom distribute data like ForwardPartitioner, to the next local 
>> operator? 
>> 
>> As I know, this still requires object serialization/deserialization since 
>> operators can’t be chained anymore. Is there a way to get minimum watermark 
>> from upstream operators without network shuffle and object 
>> serilization/deserialization?
>> 
>> Regards,
> 



Re: Add custom configuration files to TMs classpath on YARN

2017-09-27 Thread Mikhail Pryakhin
Hi Robert,

Thanks for your reply!

>I believe you should be able to load non-class files through the classloader 
>as well.
Could you please clarify what you mean by this?

>Did you see any code that excludes non-class files? 
No I didn't, but I did see the following code here [1]:

if (shipFile.isDirectory()) {
// add directories to the classpath
java.nio.file.Path shipPath = shipFile.toPath();
final java.nio.file.Path parentPath = shipPath.getParent();

Files.walkFileTree(shipPath, new 
SimpleFileVisitor() {
@Override
public FileVisitResult visitFile(java.nio.file.Path file, 
BasicFileAttributes attrs)
throws IOException {
java.nio.file.Path relativePath = 
parentPath.relativize(file);

classPaths.add(relativePath.toString());

return FileVisitResult.CONTINUE;
}
});
} else {
// add files to the classpath
classPaths.add(shipFile.getName());
}

the code above traverses the folder's content I passed via --yarnship option 
and appends non class files to the classpath in case a shipfile is a directory. 
That eventually gives no results as we all know only the following files can be 
set as jvm classpath: .class files, .jar files, .zip files or folders.

I believe that in case the code above doesn't traverse directories contents 
then everything will work as expected.

For instance if I pass a file then it appends to the classpath as is, if I 
specify a folder then it goes to the classpath as folder.
By the meantime it is not possible to pass multiple yarnship options, but I 
also created another jira ticket [2] that proposes to add the ability to 
specify multiple yarnship folders.

What do you think about that?

[1] 
https://github.com/apache/flink/blob/2eaf92b1f0a0d965c14b65755d25f1a8167de023/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1003:
 

[2] https://issues.apache.org/jira/browse/FLINK-6950 



Thanks in advance

Kind Regards,
Mike Pryakhin



> On 27 Sep 2017, at 18:30, Robert Metzger  wrote:
> 
> Hi Mike,
> 
> For using the DistributedCache approach, you need to have HDFS or another 
> distributed FS available to distribute the files.
> 
> I would actually like to understand why you said " then this file is copied 
> to the yarn cluster and added to JVM class  [...] but is ignored by TM JVM as 
> it is neither jar(zip) file nor directory..."
> I believe you should be able to load non-class files through the classloader 
> as well.
> Did you see any code that excludes non-class files? Afaik the Taskmanagers 
> have access to all files (of any type) that are passed using the --ship 
> command (or in the lib/ folder).
> 
> 
> On Wed, Sep 27, 2017 at 3:42 PM, Mikhail Pryakhin  > wrote:
> Hi Nico,
> 
> Thanks a lot for you help, but unfortunately, the workaround you suggested 
> doesn't work for me.
> I tried to leverage the StreamExecutionEnvironment#registerCachedFile method 
> but failed because this instance is created when the application master has 
> already been started therefore the classpath to run the application somewhere 
> on YARN cluster has already been created by means of 
> org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to 
> pass a local folder at the moment I submit the application so that it is 
> included in the application YARN classpath.
> The option you suggested works well if I need to cache a file that is 
> available for me at the moment I want to register it (for example a file on 
> HDFS).
> 
> Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to 
> pass user-specified folders to the YARN application classpath?
> 
> 
> 
> Kind Regards,
> Mike Pryakhin
> 
> 
> 
>> On 21 Jun 2017, at 16:55, Mikhail Pryakhin > > wrote:
>> 
>> Hi Nico!
>> Sounds great, will give it a try and return back with results soon.
>> 
>> Thank you so much for your help!!
>> 
>> Kind Regards,
>> Mike Pryakhin
>> 
>>> On 21 Jun 2017, at 16:36, Nico Kruber >> > wrote:
>>> 
>>> A workaround may be to use the DistributedCache. It apparently is not 
>>> documented much but the JavaDoc mentions roughly how to use it:
>>> 
>>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
>>>  
>>> 
>>> flink/api/java/ExecutionEnvironment.java#L954
>>> 
>>> /**
>>> * Registers a file at the distributed cache under the given name. The file 
>>> will 
>>> be accessible
>>> * from any user-defined function in the (distributed) runtime und

Programmatic configuration

2017-09-27 Thread Dustin Jenkins
Hello,

I’m running a single Flink Job Manager with a Task Manager in Docker containers 
with Java 8.  They are remotely located (flink.example.com 
).

I’m submitting a job from my desktop and passing the job to the Job Manager 
with -m flink.example.com:6123 , which seems to 
work well.  I’m doing a search on an S3 system located at s3.example.com 
.

The problem is that in order to have access to the S3 system, the 
$HADOOP_CONFIG/core-site.xml and $FLINK_HOME/flink-conf.yaml need to be 
configured for it at the Job Manager and Task Manager level, which means they 
are tied to that particular endpoint (including my access key and secret key).  
Is there someway I can specify the configuration only in my application so I 
can leave my Flink server cluster to be mostly generic?

Thank you!
Dustin  



Re: Flink Application Jar file on Docker container

2017-09-27 Thread Eron Wright
There was also a talk about containerization at Flink Forward that touches
on some of your questions.
https://www.youtube.com/watch?v=w721NI-mtAA&t=2s&list=PLDX4T_cnKjD0JeULl1X6iTn7VIkDeYX_X&index=33

Eron

On Wed, Sep 27, 2017 at 9:12 AM, Stefan Richter  wrote:

> Hi,
>
> from the top of my head, I cannot see why this should not be possible,
> task managers just need to be able to connect to their job manager.
> Unfortunately, I cannot give a real guarantee here because I am not that
> deeply involved in this aspect of Flink.
>
> Best,
> Stefan
>
> Am 27.09.2017 um 12:39 schrieb Rahul Raj :
>
> Hi Stefan,
>
> I have a question in my mid out of curiosity Is it possible to run
> flink application within docker container by using flink cluster set up on
> host?
>
> Rahul Raj
>
> On 26 September 2017 at 17:29, Stefan Richter  > wrote:
>
>> Hi,
>>
>> if I correctly understood the approach outlined on github, you can start
>> a standalone job manager and the task manager get the JM information either
>> through the provided configuration or through Zookeeper. Take a look at the
>> „running section“, e.g.:
>>
>> 1) „Via Mesos/Marathon: Start a standalone JobManager (you need to
>> replace the flink_recovery_zookeeper_quorum variable with a valid
>> setting for your cluster) [...]“
>> 2) „Via standalone Docker: Start a standalone JobManager (with host
>> networking, binding on 127.0.0.1) […]“
>>
>> Best,
>> Stefan
>>
>>
>> Am 26.09.2017 um 12:43 schrieb Rahul Raj :
>>
>> Hi Stefan,
>>
>> Thanks a lot for your answer and sharing the link
>> https://github.com/mesoshq/flink. I went through this and saw its
>> spawning Jobmanager and taskmanager. Now I think, this should be happening.
>> First JobManager will be started on flink cluster on one node, then task
>> manager will be started on another node and both should be running in
>> docker containers on different nodes. Now, my question is how flink's
>> JobManager will get to know about the taskManagers as they are in in
>> different docker containers on different nodes? Will it happen via Mesos?
>>
>> Can we use mesos-appmaster.sh instead which is already built in flink for
>> deployment on mesos?
>>
>> Rahul Raj
>>
>> On 26 September 2017 at 15:32, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> as in my answer to your previous mail, I suggest to take a look at
>>> https://github.com/mesoshq/flink . Unfortunately, there is not yet a
>>> lot documentation about the internals of how this works, so I am also
>>> looping in Till who might know more about specific questions about how
>>> things work together exactly.
>>>
>>> Best,
>>> Stefan
>>>
>>>
>>> Am 26.09.2017 um 09:21 schrieb Rahul Raj :
>>>
>>> Currently I have a Flink Application Jar file running on Mesos cluster.
>>> The flink application simply reads data from Kafka and put it to HDFS.
>>>
>>> Now we are planning to create a docker image to  run this application
>>> jar file inside docker containers on Mesos cluster via Marathon.
>>>
>>> Below are the questions that I am looking answers for:
>>>
>>> 1. While building the docker image, how do I include flink-1.3.2 set up
>>> and  my mesos config in flink?
>>>
>>> 2. How shall I run my existing flink application jar?
>>>
>>> 3. Will running my flink application jar on docker containers will run
>>> it on mesos slaves on different docker containers? How docker , Flink,
>>> mesos , Marathon will work together in my case?
>>>
>>> Rahul Raj
>>>
>>>
>>>
>>
>>
>
>


Re: Add custom configuration files to TMs classpath on YARN

2017-09-27 Thread Haohui Mai
What we internally did is to inherit from AbstractYarnClusterDescriptor and
to customize from there.

It's not too difficult but it's nice to see it can be taken care of by
AbstractYarnCusterDescriptor.

~Haohui

On Wed, Sep 27, 2017 at 9:36 AM Mikhail Pryakhin 
wrote:

> Hi Robert,
>
> Thanks for your reply!
>
> >I believe you should be able to load non-class files through the
> classloader as well.
> Could you please clarify what you mean by this?
>
> >Did you see any code that excludes non-class files?
> No I didn't, but I did see the following code here [1]:
>
> if (shipFile.isDirectory()) {
> // add directories to the classpath
> java.nio.file.Path shipPath = shipFile.toPath();
> final java.nio.file.Path parentPath = shipPath.getParent();
>
> Files.walkFileTree(shipPath, new SimpleFileVisitor() {
> @Override
> public FileVisitResult visitFile(java.nio.file.Path file,
> BasicFileAttributes attrs)
> throws IOException {
> java.nio.file.Path relativePath = parentPath.relativize(file);
>
> classPaths.add(relativePath.toString());
>
> return FileVisitResult.CONTINUE;
> }
> });
> } else {
> // add files to the classpath
> classPaths.add(shipFile.getName());
> }
>
> the code above traverses the folder's content I passed via --yarnship
> option and appends non class files to the classpath in case a shipfile is a
> directory. That eventually gives no results as we all know only the
> following files can be set as jvm classpath: .class files, .jar files, .zip
> files or folders.
>
> I believe that in case the code above doesn't traverse directories
> contents then everything will work as expected.
>
> For instance if I pass a file then it appends to the classpath as is, if I
> specify a folder then it goes to the classpath as folder.
> By the meantime it is not possible to pass multiple yarnship options, but
> I also created another jira ticket [2] that proposes to add the ability to
> specify multiple yarnship folders.
>
> What do you think about that?
>
> [1]
> https://github.com/apache/flink/blob/2eaf92b1f0a0d965c14b65755d25f1a8167de023/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1003:
> [2] https://issues.apache.org/jira/browse/FLINK-6950
>
>
> Thanks in advance
>
> Kind Regards,
> Mike Pryakhin
>
>
>
> On 27 Sep 2017, at 18:30, Robert Metzger  wrote:
>
> Hi Mike,
>
> For using the DistributedCache approach, you need to have HDFS or another
> distributed FS available to distribute the files.
>
> I would actually like to understand why you said " then this file is
> copied to the yarn cluster and added to JVM class  [...] but is ignored
> by TM JVM as it is neither jar(zip) file nor directory..."
> I believe you should be able to load non-class files through the
> classloader as well.
> Did you see any code that excludes non-class files? Afaik the Taskmanagers
> have access to all files (of any type) that are passed using the --ship
> command (or in the lib/ folder).
>
>
> On Wed, Sep 27, 2017 at 3:42 PM, Mikhail Pryakhin 
> wrote:
>
>> Hi Nico,
>>
>> Thanks a lot for you help, but unfortunately, the workaround you
>> suggested doesn't work for me.
>> I tried to leverage the StreamExecutionEnvironment#registerCachedFile
>> method but failed because this instance is created when the application
>> master has already been started therefore the classpath to run the
>> application somewhere on YARN cluster has already been created by means of
>> org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to
>> pass a local folder at the moment I submit the application so that it is
>> included in the application YARN classpath.
>> The option you suggested works well if I need to cache a file that is
>> available for me at the moment I want to register it (for example a file on
>> HDFS).
>>
>> Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to
>> pass user-specified folders to the YARN application classpath?
>>
>>
>>
>> Kind Regards,
>> Mike Pryakhin
>>
>>
>>
>> On 21 Jun 2017, at 16:55, Mikhail Pryakhin  wrote:
>>
>> Hi Nico!
>> Sounds great, will give it a try and return back with results soon.
>>
>> Thank you so much for your help!!
>>
>> Kind Regards,
>> Mike Pryakhin
>>
>> On 21 Jun 2017, at 16:36, Nico Kruber  wrote:
>>
>> A workaround may be to use the DistributedCache. It apparently is not
>> documented much but the JavaDoc mentions roughly how to use it:
>>
>>
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
>> flink/api/java/ExecutionEnvironment.java#L954
>>
>> /**
>> * Registers a file at the distributed cache under the given name. The
>> file will
>> be accessible
>> * from any user-defined function in the (distributed) runtime under a
>> local
>> path. Files
>> * may be local files (as long as all relevant workers have access to it),
>> or
>> files in a distributed file system.
>> * The runtime will copy the files temporarily to a local cache, if needed.
>> * 
>> * The {@link org.apache.flink.a

Issue with CEP library

2017-09-27 Thread Ajay Krishna
Hi,

I've been only working with flink for the past 2 weeks on a project and am
trying using the CEP library on sensor data. I am using flink version
1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running
Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the
flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots

What I observe is the following. The input to Kafka is a json string and
when parsed on the flink side, it looks like this

(101,Sun Sep 24 23:18:53 UTC 2017,complex
event,High,37.75142,-122.39458,12.0,20.0)

I use a Tuple8 to capture the parsed data. The first field is home_id. The
time characteristic is set to EventTime and I have an
AscendingTimestampExtractor using the timestamp field. I have parallelism
for the execution environment is set to 4. I have a rather simple event
that I am trying to capture

DataStream> cepMapByHomeId = cepMap.keyBy(0);

//cepMapByHomeId.print();

Pattern,
?> cep1 =

Pattern.>begin("start")
.where(new OverLowThreshold())
.followedBy("end")
.where(new OverHighThreshold());


PatternStream> patternStream =
CEP.pattern(cepMapByHomeId.keyBy(0), cep1);


DataStream> alerts = patternStream.select(new
PackageCapturedEvents());

The pattern checks if the 7th field in the tuple8 goes over 12 and then
over 16. The output of the pattern is like this

(201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex
event,Non-event,37.75837,-122.41467)

On the Kafka producer side, I am trying send simulated data for around 100
homes, so the home_id would go from 0-100 and the input is keyed by
home_id. I have about 10 partitions in kafka. The producer just loops going
through a csv file with a delay of about 100 ms between 2 rows of the csv
file. The data is exactly the same for all 100 of the csv files except for
home_id and the lat & long information. The timestamp is incremented by a
step of 1 sec. I start multiple processes to simulate data form different
homes.

THE PROBLEM:

Flink completely misses capturing events for a large subset of the input
data. I barely see the events for about 4-5 of the home_id values. I do a
print before applying the pattern and after and I see all home_ids before
and only a tiny subset after. Since the data is exactly the same, I expect
all homeid to be captured and written to my sink which is cassandra in this
case. I've looked through all available docs and examples but cannot seem
to get a fix for the problem.

I would really appreciate some guidance how to understand fix this.


Thank you,

Ajay


how many 'run -c' commands to start?

2017-09-27 Thread r. r.
Hello

I successfully ran a job with 'flink run -c', but this is for the local

setup.

How should i proceed with a cluster? Will flink automagically instantiate

the job on all servers - i hope i don't have to start 'flink run -c' on all

machines.

New to flink and bigdata, so sorry for the probably silly question



Thanks!

Rob