Re: processWindowFunction

2018-08-16 Thread antonio saldivar
Hi Vino
thank you for the information, actually I am using a trigger alert and
processWindowFunction to send my results, but when my window slides or ends
it sends again the objects and I an getting duplicated data

El jue., 16 ago. 2018 a las 22:05, vino yang ()
escribió:

> Hi Antonio,
>
> What results do not you want to get when creating each window?
> Examples of the use of ProcessWindowFunction are included in many test
> files in Flink's project, such as SideOutputITCase.scala or
> WindowTranslationTest.scala.
>
> For more information on ProcessWindowFunction, you can refer to the
> official website.[1]
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月17日周五 上午6:24写道:
>
>> Hello
>>
>> I am implementing a data stream where I use sliding windows but I am
>> stuck because I need to set values to my object based on some if statements
>> in my process function  and send the object to the next step but I don't
>> want results every time a window is creating
>>
>> if anyone has a good example on this that can help me
>>
>


Re: Kafka connector issue

2018-08-16 Thread TechnoMage
It looks like it is some issue with backpressure as the same behavior happens 
with the client library as a custom source.

Michael

> On Aug 16, 2018, at 6:59 PM, TechnoMage  wrote:
> 
> I have seen this in the past and running into it again.
> 
> I have a kafka consumer that is not getting all the records from the topic.  
> Kafka conforms there are 300k messages in each partition, and flink only sees 
> a total of 8000 records in the source.
> 
> Kafka is 2.0, flink is 1.4.2 connector is FlinkKafkaConsumer011
> 
>  Properties props = new Properties();
>  props.setProperty("bootstrap.servers", servers);
>  props.setProperty("group.id", UUID.randomUUID().toString());
>  props.setProperty("flink.partition-discovery.interval-millis", "1");
>  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>  DataStream ds = consumers.get(a.eventType);
>  if (ds == null) {
>FlinkKafkaConsumer011 cons = new FlinkKafkaConsumer011(
>topic, new SimpleStringSchema(), props);
>cons.setStartFromEarliest();
>ds = env.addSource(cons).name(et.name).rebalance();
>consumers.put(a.eventType, ds);
>  }
> 
> I am about to rip out the kafka consumer and build a source using the client 
> library which has been 100% reliable in working with kafka.  Any pointers 
> welcome.
> 
> Michael
> 



Need a clarification about removing a stateful operator

2018-08-16 Thread Tony Wei
Hi all,

I'm confused about the description in documentation. [1]


   - *Removing a stateful operator:* The state of the removed operator is
   lost unless
   another operator takes it over. When starting the upgraded application,
   you have
   to explicitly agree to discard the state.

Does that mean if I take a full snapshot (e.g. savepoint) after restoring
by explicitly agreeing to
discard the state, then the state won't exist in that snapshot? Or does it
just mean ignore the
state but the state still exist forever, unless I explicitly purge that
state by using state operator?

And could this behavior differ between different state backend (Memory, FS,
RocksDB) ?

Many thanks,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#application-topology


Looking for flink code example using flink-jpmml library over DataStream

2018-08-16 Thread sagar loke
Hi,

We are planning to use flink to run jpmml models using flink-jpmml library
from (radicalbit) over DataStream in Flink.

Is there any code example which we can refer to kick start the process ?

Thanks,


Re: processWindowFunction

2018-08-16 Thread vino yang
Hi Antonio,

What results do not you want to get when creating each window?
Examples of the use of ProcessWindowFunction are included in many test
files in Flink's project, such as SideOutputITCase.scala or
WindowTranslationTest.scala.

For more information on ProcessWindowFunction, you can refer to the
official website.[1]

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction

Thanks, vino.

antonio saldivar  于2018年8月17日周五 上午6:24写道:

> Hello
>
> I am implementing a data stream where I use sliding windows but I am stuck
> because I need to set values to my object based on some if statements in my
> process function  and send the object to the next step but I don't want
> results every time a window is creating
>
> if anyone has a good example on this that can help me
>


Re: watermark does not progress

2018-08-16 Thread Hequn Cheng
Hi Jo,

Thanks for letting us know.

Best, Hequn

On Fri, Aug 17, 2018 at 2:12 AM, John O  wrote:

> Just wanted to post an update on this.
>
>
>
> Problem was my dataset. I was using a kafka topic with multiple partitions
> but only generated data for a single key. This meant that in a
> parallelism>1 environment, some sources will never get any data and
> watermark. After “keyby”, the next processor will have to choose which
> watermark to use from the multiple sources(the lowest value) thus never
> progressing the watermark.
>
>
>
>
>
> Jo
>
>
>
>
>
>
>
> *From:* Hequn Cheng 
> *Sent:* Wednesday, August 15, 2018 6:38 AM
> *To:* John O 
> *Cc:* Fabian Hueske ; vino yang ;
> user 
>
> *Subject:* Re: watermark does not progress
>
>
>
> Hi John,
>
>
>
> I guess the source data of local are different from the cluster. And as
> Fabian said, it is probably that some partitions don't carry data.
> As a choice, you can set job parallelism to 1 and check the result.
>
>
>
> Best, Hequn
>
>
>
> On Wed, Aug 15, 2018 at 5:22 PM, John O  wrote:
>
> I did some more testing.
>
> Below is a pseudo version of by setup.
>
> kafkaconsumer->
> assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor)->
> process(print1 ctx.timerService().currentWatermark()) ->
> keyBy(_.someProp) ->
> process(print2 ctx.timerService().currentWatermark()) ->
>
> I am manually sending monotonically increasing (eventtime ) records to
> kafka topic.
>
> What I see is in print1 I see expected watermark
>
> But print2 is always Long.MIN
>
> It looks like keyBy wipes out the watermark.
>
>
>
> Now, if I run the exact same code on a flink cluster, print2 outputs
> expected watermark.
>
>
>
> Jo
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Wednesday, August 15, 2018 2:07 AM
> *To:* vino yang 
> *Cc:* John O ; user 
> *Subject:* Re: watermark does not progress
>
>
>
> Hi John,
>
>
>
> Watermarks cannot make progress if you have stream partitions that do not
> carry any data.
>
> What kind of source are you using?
>
>
>
> Best,
>
> Fabian
>
>
>
> 2018-08-15 4:25 GMT+02:00 vino yang :
>
> Hi Johe,
>
>
>
> In local mode, it should also work.
>
> When you debug, you can set a breakpoint in the getCurrentWatermark method
> to see if you can enter the method and if the behavior is what you expect.
>
> What is your source? If you post your code, it might be easier to locate.
>
> In addition, for positioning watermark, you can also refer to this
> email[1].
>
>
>
> [1]: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Debugging-watermarks-td7065.html
>
>
>
> Thanks, vino.
>
>
>
> John O  于2018年8月15日周三 上午9:44写道:
>
> I am noticing that watermark does not progress as expected when running
> locally in IDE. It just stays at Long.MIN
>
>
>
> I am using EventTime processing and have tried both these time extractors.
>
> · assignAscendingTimestamps ...
>
> · 
> assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor)
> ...
>
>
>
> Also, configured the environment as so
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>
>
> If I run the job on a flink cluster, I do see the watermark progress.
>
>
>
> Is watermarking not supported in local mode?
>
>
>
> Thanks
>
> Jo
>
>
>
>
>


Re: Flink CLI does not return after submitting yarn job in detached mode

2018-08-16 Thread vino yang
Hi Madhav,

Can you set the log level to DEBUG in the log4j-client configuration file?
Then post the log. I can try to locate it through the log.

Thanks, vino.

makelkar  于2018年8月17日周五 上午1:27写道:

> Hi Vino,
>We should not have to specify class name using -c option to run
> job in detached mode. I tried that this morning but it also didn't work.
>
>flink CLI always starts in interactive mode, and somehow ignores
> option -yd specified in yarn-cluster mode. Can someone verify this please?
> If its the case, its a bug in flink CLI.
>
>I have an ugly workaround where I start flink CLI in background, and
> I would like to avoid doing that.
>
> Thanks,
> Madhav.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Kafka connector issue

2018-08-16 Thread TechnoMage
I have seen this in the past and running into it again.

I have a kafka consumer that is not getting all the records from the topic.  
Kafka conforms there are 300k messages in each partition, and flink only sees a 
total of 8000 records in the source.

Kafka is 2.0, flink is 1.4.2 connector is FlinkKafkaConsumer011

  Properties props = new Properties();
  props.setProperty("bootstrap.servers", servers);
  props.setProperty("group.id", UUID.randomUUID().toString());
  props.setProperty("flink.partition-discovery.interval-millis", "1");
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  DataStream ds = consumers.get(a.eventType);
  if (ds == null) {
FlinkKafkaConsumer011 cons = new FlinkKafkaConsumer011(
topic, new SimpleStringSchema(), props);
cons.setStartFromEarliest();
ds = env.addSource(cons).name(et.name).rebalance();
consumers.put(a.eventType, ds);
  }

I am about to rip out the kafka consumer and build a source using the client 
library which has been 100% reliable in working with kafka.  Any pointers 
welcome.

Michael



processWindowFunction

2018-08-16 Thread antonio saldivar
Hello

I am implementing a data stream where I use sliding windows but I am stuck
because I need to set values to my object based on some if statements in my
process function  and send the object to the next step but I don't want
results every time a window is creating

if anyone has a good example on this that can help me


InvalidTypesException: Type of TypeVariable 'K' in 'class X' could not be determined

2018-08-16 Thread Miguel Coimbra
Hello,

I have some code which compiles correctly (Flink 1.4) under Java 8.
It uses generic types.
While it compiles correctly, the execution fails with the error:

org.apache.flink.api.common.functions.InvalidTypesException: Type of
TypeVariable 'K' in 'class X' could not be determined.

This is my main:

public static void main(final String[] args) {
X x = new X();
}


This is my class X:

public class X {

public X() {
TypeInformation keySelector = TypeInformation.of(new
TypeHint(){});
}
}


Perhaps I'm lacking knowledge on the way Java's generics work, but why
can't Flink determine the TypeVariable of 'K'?
As I am instantiating X parameterized as a Long, that information should
eventually reach Flink and the constructor of X would be equivalent to this:

public X() {
TypeInformation keySelector = TypeInformation.of(new
TypeHint(){});
}

During execution, however, this error pops up.
What am I missing here, and what is the best way to achieve this generic
behavior in a Flink-idiomatic way?

Thank you very much for your time.


Running Flink in multiple AWS availability zones

2018-08-16 Thread Jamie Grier
Hi all,

I'm looking to learn if/how others are running Flink jobs in such a way
that they can survive failure of a single Amazon AWS availability zone.

If you're currently doing this I would love a reply detailing your setup.

Thanks!

-Jamie


RE: watermark does not progress

2018-08-16 Thread John O
Just wanted to post an update on this.

Problem was my dataset. I was using a kafka topic with multiple partitions but 
only generated data for a single key. This meant that in a parallelism>1 
environment, some sources will never get any data and watermark. After “keyby”, 
the next processor will have to choose which watermark to use from the multiple 
sources(the lowest value) thus never progressing the watermark.


Jo



From: Hequn Cheng 
Sent: Wednesday, August 15, 2018 6:38 AM
To: John O 
Cc: Fabian Hueske ; vino yang ; user 

Subject: Re: watermark does not progress

Hi John,

I guess the source data of local are different from the cluster. And as Fabian 
said, it is probably that some partitions don't carry data.
As a choice, you can set job parallelism to 1 and check the result.

Best, Hequn

On Wed, Aug 15, 2018 at 5:22 PM, John O 
mailto:son...@samsung.com>> wrote:
I did some more testing.
Below is a pseudo version of by setup.

kafkaconsumer->
assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor)->
process(print1 ctx.timerService().currentWatermark()) ->
keyBy(_.someProp) ->
process(print2 ctx.timerService().currentWatermark()) ->

I am manually sending monotonically increasing (eventtime ) records to kafka 
topic.

What I see is in print1 I see expected watermark
But print2 is always Long.MIN

It looks like keyBy wipes out the watermark.

Now, if I run the exact same code on a flink cluster, print2 outputs expected 
watermark.

Jo

From: Fabian Hueske mailto:fhue...@gmail.com>>
Sent: Wednesday, August 15, 2018 2:07 AM
To: vino yang mailto:yanghua1...@gmail.com>>
Cc: John O mailto:son...@samsung.com>>; user 
mailto:user@flink.apache.org>>
Subject: Re: watermark does not progress

Hi John,

Watermarks cannot make progress if you have stream partitions that do not carry 
any data.
What kind of source are you using?

Best,
Fabian

2018-08-15 4:25 GMT+02:00 vino yang 
mailto:yanghua1...@gmail.com>>:
Hi Johe,

In local mode, it should also work.
When you debug, you can set a breakpoint in the getCurrentWatermark method to 
see if you can enter the method and if the behavior is what you expect.
What is your source? If you post your code, it might be easier to locate.
In addition, for positioning watermark, you can also refer to this email[1].

[1]: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Debugging-watermarks-td7065.html

Thanks, vino.

John O mailto:son...@samsung.com>> 于2018年8月15日周三 上午9:44写道:
I am noticing that watermark does not progress as expected when running locally 
in IDE. It just stays at Long.MIN

I am using EventTime processing and have tried both these time extractors.

• assignAscendingTimestamps ...

• 
assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor) ...

Also, configured the environment as so
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

If I run the job on a flink cluster, I do see the watermark progress.

Is watermarking not supported in local mode?

Thanks
Jo




Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

2018-08-16 Thread Ken Krugler
Hi Piotr,

Thanks, and darn it that’s something I should have noticed.

— Ken


> On Aug 16, 2018, at 4:37 AM, Piotr Nowojski  wrote:
> 
> Hi,
> 
> You made a small mistake when restoring from state using test harness, that I 
> myself have also done in the past. Problem is with an ordering of those calls:
> 
> result.open();
> if (savedState != null) {
> result.initializeState(savedState);
> }
> 
> Open is supposed to be called after initializeState, and if you look into the 
> code of AbstractStreamOperatorTestHarness#open, if it is called before 
> initialize, it will initialize harness without any state.
> 
> Unfortunate is that this is implicit behaviour that doesn’t throw any error 
> (test harness is not part of a Flink’s public api). I will try to fix this: 
> https://issues.apache.org/jira/browse/FLINK-10159 
> 
> 
> Piotrek
> 
>> On 16 Aug 2018, at 00:24, Ken Krugler > > wrote:
>> 
>> Hi all,
>> 
>> It looks to me like the OperatorSubtaskState returned from 
>> OneInputStreamOperatorTestHarness.snapshot fails to include any timers that 
>> had been registered via registerProcessingTimeTimer but had not yet fired 
>> when the snapshot was saved.
>> 
>> Is this a known limitation of OneInputStreamOperatorTestHarness?
>> 
>> If not, is there anything special I need to do when setting up the test 
>> harness to ensure that timers are saved?
>> 
>> Below is the unit test, which shows how the test harness is being set up and 
>> run.
>> 
>> The TimerFunction used in this test does seem to be doing the right thing, 
>> as using it in a simple job on a local Flink cluster works as expected when 
>> creating & then restarting from a savepoint.
>> 
>> Thanks,
>> 
>> — Ken
>> 
>> ==
>> TimerTest.java
>> ==
>> package com.scaleunlimited.flinkcrawler.functions;
>> 
>> import static org.junit.Assert.assertTrue;
>> 
>> import java.util.ArrayList;
>> import java.util.List;
>> 
>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
>> import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>> import 
>> org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
>> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
>> import org.junit.Before;
>> import org.junit.Test;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> 
>> import com.scaleunlimited.flinkcrawler.tools.TimerTool;
>> 
>> public class TimerTest {
>> public static final Logger LOGGER = 
>> LoggerFactory.getLogger(TimerTest.class);
>> 
>> private List _firedTimers = new ArrayList();
>> 
>> @Before
>> public void setUp() throws Exception {
>> }
>> 
>> @Test
>> public void testTimerSaving() throws Throwable {
>> 
>> // This operator doesn't really do much at all, but the first element
>> // it processes will create a timer for (timestamp+1).
>> // Whenever that timer fires, it will create another timer for 
>> // (timestamp+1).
>> KeyedProcessOperator operator = 
>> new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
>> 
>> // Create a test harness from scratch
>> OneInputStreamOperatorTestHarness testHarness = 
>> makeTestHarness(operator, null);
>> 
>> // We begin at time zero
>> testHarness.setProcessingTime(0);
>> 
>> // Process some elements, which should also create a timer for time 
>> 1.
>> int inputs[] = new int[] {1, 2, 3};
>> for (int input : inputs) {
>> testHarness.processElement(new StreamRecord<>(input));
>> }
>> 
>> // Force some time to pass, which should keep moving the timer ahead,
>> // finally leaving it set for time 10.
>> for (long i = 1; i < 10; i++) {
>> testHarness.setProcessingTime(i);
>> }
>> 
>> // Save the state, which we assume should include the timer we set 
>> for
>> // time 10.
>> OperatorSubtaskState savedState = 
>> testHarness.snapshot(0L, testHarness.getProcessingTime());
>> 
>> // Close the first test harness
>> testHarness.close();
>> 
>> // Create a new test harness using the saved state (which we assume
>> // includes the timer for time 10).
>> testHarness = makeTestHarness(operator, savedState);
>> 
>> // Force more time to pass, which should keep moving the timer ahead.
>> for (long i = 10; i < 20; i++) {
>> 

Re: Flink CLI does not return after submitting yarn job in detached mode

2018-08-16 Thread makelkar
Hi Vino,
   We should not have to specify class name using -c option to run
job in detached mode. I tried that this morning but it also didn't work.

   flink CLI always starts in interactive mode, and somehow ignores
option -yd specified in yarn-cluster mode. Can someone verify this please?
If its the case, its a bug in flink CLI. 

   I have an ugly workaround where I start flink CLI in background, and
I would like to avoid doing that.

Thanks,
Madhav.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Scala 2.12 Support

2018-08-16 Thread Timo Walther

Hi Aaron,

we just released Flink 1.6 and the discussion for the roadmap of 1.7 
will begin soon. I guess the Jira issue will also updated then. I would 
recommend to watch it for now.


Regards,
Timo


Am 16.08.18 um 17:08 schrieb Aaron Levin:

Hi Piotr,

Thanks for the update. Glad to hear it's high on the priority list! 
I'm looking forward to the 1.7 update!


It may be worth having someone more official from the Flink team give 
an update on that ticket. It wasn't clear if the 1.7 comment from that 
user was just a reference to the fact that 1.6 had come out (or where 
they got that information). I know a few people have cited the ticket 
and concluded "not clear what's going on with Scala 2.12 support." If 
you have the bandwidth, a note from you or anyone else would be helpful!


Thanks again!

Best,

Aaron Levin

On Thu, Aug 16, 2018 at 6:04 AM, Piotr Nowojski 
mailto:pi...@data-artisans.com>> wrote:


Hi,

Scala 2.12 support is high on our priority list and we hope to
have it included for the 1.7 release (as you can see in the ticket
itself), which should happen later this year.

Piotrek



On 15 Aug 2018, at 17:59, Aaron Levin mailto:aaronle...@stripe.com>> wrote:

Hello!

I'm wondering if there is anywhere I can see Flink's roadmap for
Scala 2.12 support. The last email I can find on the list for
this was back in January, and the FLINK-7811[0], the ticket
asking for Scala 2.12 support, hasn't been updated in a few months.

Recently Spark fixed the ClosureCleaner code to support Scala
2.12[1], and from what I can gather this was one of the main
barrier for Flink supporting Scala 2.12. Given this has been
fixed, is there work in progress to support Scala 2.12? Any
updates on FLINK-7811?

Thanks for your help!

[0] https://issues.apache.org/jira/browse/FLINK-7811

[1] https://issues.apache.org/jira/browse/SPARK-14540


Best,

Aaron Levin







Re: Scala 2.12 Support

2018-08-16 Thread Aaron Levin
Hi Piotr,

Thanks for the update. Glad to hear it's high on the priority list! I'm
looking forward to the 1.7 update!

It may be worth having someone more official from the Flink team give an
update on that ticket. It wasn't clear if the 1.7 comment from that user
was just a reference to the fact that 1.6 had come out (or where they got
that information). I know a few people have cited the ticket and concluded
"not clear what's going on with Scala 2.12 support." If you have the
bandwidth, a note from you or anyone else would be helpful!

Thanks again!

Best,

Aaron Levin

On Thu, Aug 16, 2018 at 6:04 AM, Piotr Nowojski 
wrote:

> Hi,
>
> Scala 2.12 support is high on our priority list and we hope to have it
> included for the 1.7 release (as you can see in the ticket itself), which
> should happen later this year.
>
> Piotrek
>
>
> On 15 Aug 2018, at 17:59, Aaron Levin  wrote:
>
> Hello!
>
> I'm wondering if there is anywhere I can see Flink's roadmap for Scala
> 2.12 support. The last email I can find on the list for this was back in
> January, and the FLINK-7811[0], the ticket asking for Scala 2.12 support,
> hasn't been updated in a few months.
>
> Recently Spark fixed the ClosureCleaner code to support Scala 2.12[1], and
> from what I can gather this was one of the main barrier for Flink
> supporting Scala 2.12. Given this has been fixed, is there work in progress
> to support Scala 2.12? Any updates on FLINK-7811?
>
> Thanks for your help!
>
> [0] https://issues.apache.org/jira/browse/FLINK-7811
> [1] https://issues.apache.org/jira/browse/SPARK-14540
>
> Best,
>
> Aaron Levin
>
>
>


Re: How to submit flink job on yarn by java code

2018-08-16 Thread Piotr Nowojski
Hi,

Is this path accessible on the container? If not, use some distributed file 
system, nfs or -yt —yarnship option of the cli.

Please also take a look at 
https://lists.apache.org/thread.html/%3CCAF=1nJ8GONoqux7czxpUxAf7L3p=-E_ePSTHk0uWa=GRyG=2...@mail.gmail.com%3E
 


Piotrek

> On 16 Aug 2018, at 11:05, spoon_lz <971066...@qq.com> wrote:
> 
> Sorry, I don't know why the code and error are not visible.
> The error is :
> The program finished with the following exception:
> 
> /org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
>   at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
>   at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
>   at flink.SubmitDemo.submit(SubmitDemo.java:75)
>   at flink.SubmitDemo.main(SubmitDemo.java:50)
> Caused by:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
> The YARN application unexpectedly switched to state FAILED during
> deployment. 
> Diagnostics from YARN: Application application_1526888270443_0090 failed 2
> times due to AM Container for appattempt_1526888270443_0090_02 exited
> with  exitCode: -1000
> For more detailed output, check application tracking
> page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then,
> click on links to logs of each attempt.
> Diagnostics: File
> file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
> does not exist
> java.io.FileNotFoundException: File
> file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
> does not exist
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
>   at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
>   at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
>   at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
>   at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
>   at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>   at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
>   at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 
> Failing this attempt. Failing the application.
> If log aggregation is enabled on your cluster, use this command to further
> investigate the issue:
> yarn logs -applicationId application_1526888270443_0090
>   at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
>   at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
>   at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
>   ... 5 more/
> 
> and my code like :
> 
> /public class SubmitDemo {
> 
> 
>private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf";
>private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf";
>private static final String JAR_FILE =
> "/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar";
> 
> 
>public static void main(String[] args) {
> 
>SubmitDemo demo = new SubmitDemo();
>demo.before();
>List parameters = new ArrayList<>();
>parameters.add("run");
>parameters.add("-d");
>parameters.add("-m");
>parameters.add("yarn-cluster");
>parameters.add("-ynm");
>parameters.add("lz_test_alone");
>parameters.add("-yn");
>parameters.add("4");
>parameters.add("-ytm");
>parameters.add("4096");
> 

Re: How to compare two window ?

2018-08-16 Thread Piotr Nowojski
Hi,

Could you rephrase your question? Maybe by posting some code examples?

Piotrek

> On 16 Aug 2018, at 08:26, 苗元君  wrote:
> 
> Hi, Flink guys, 
> U really to a quick release, it's fantastic ! 
> 
> I'v got a situation , 
> window 1 is time driven, slice is 1min, trigger is 1 count
> window 2 is count driven, slice is 3 count, trigger is 1count
> 
> 1. Then element is out of window1 and just right into window2. 
> For example if there is only 2 element, window2 will have none element.  
> how to build window like this ? 
>I try to use window1 by structure (window trigger evictor) then window2 
> structure(trigger evictor)
>I got element calculate just in window1 and window2 in the same time
> 
> 2.  I try to find ways to use SQL on AllWindowedStream but seem not working. 
> Can SQL Query use on a WINDOW ?
> 3.  How to compare these SQL result ?
> 
> 
> 
> 
> Thank U so much.
> 
> -- 
> Yuanjun Miao
> 



Re: How to compare two window ?

2018-08-16 Thread Hequn Cheng
Hi miaoyuan,

> First question
I didn't quite catch your meaning. There are some documents about using
window in sql[1] or table-api[2] and it is worth to take a look.

> Second question
Group by window without other keys results a AllWindowedStream. Sql like
bellow:

> SELECT COUNT(*)
> , TUMBLE_START(rowtime, INTERVAL '15' MINUTE)
> , TUMBLE_END(rowtime, INTERVAL '15' MINUTE)
> FROM MyTable
> GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)


> Third question
You can print or sink data into external storages and compare the result.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#group-windows

On Thu, Aug 16, 2018 at 2:26 PM, 苗元君  wrote:

> Hi, Flink guys,
> U really to a quick release, it's fantastic !
>
> I'v got a situation ,
> window 1 is time driven, slice is 1min, trigger is 1 count
> window 2 is count driven, slice is 3 count, trigger is 1count
>
> 1. Then element is out of window1 and just right into window2.
> For example if there is only 2 element, window2 will have none
> element. * how to build window like this ? *
>I try to use window1 by structure (window trigger evictor) then window2
> structure(trigger evictor)
>I got element calculate just in window1 and window2 in the same time
>
> 2.  I try to find ways to use SQL on AllWindowedStream but seem not
> working. Can SQL Query use on a WINDOW ?
> 3.  How to compare these SQL result ?
>
>
> [image: image.png]
>
> Thank U so much.
>
> --
>
> *Yuanjun Miao*
>


Re: Standalone cluster instability

2018-08-16 Thread Piotr Nowojski
Hi,

I’m not aware of such rules of thumb. Memory consumption is highly application 
and workload specific. It depends on how much things you allocate in your user 
code and how much memory do you keep on state (in case of heap state backend). 
Basically just as with most java applications, you have to use trial and error 
method.

One good practice is to before any deployment, test your Flink application on a 
testing cluster, that is identical to production cluster, by (re)processing 
some of the production workload/backlog/data (in parallel to production 
cluster).

Piotrek 

> On 16 Aug 2018, at 13:23, Shailesh Jain  wrote:
> 
> Thank you for your help Piotrek.
> 
> I think it was a combination of a. other processes taking up available memory 
> and b. flink processes consuming all the memory allocated to them, that 
> resulted in kernel running out of memory.
> 
> Are there any heuristics or best practices which you (or anyone in the 
> community) recommend to benchmark memory requirements of a particular flink 
> job?
> 
> Thanks,
> Shailesh
> 
> 
> On Tue, Aug 14, 2018 at 6:08 PM, Piotr Nowojski  > wrote:
> Hi,
> 
> Good that we are more or less on track with this problem :) But the problem 
> here is not that heap size is too small, bot that your kernel is running out 
> of memory and starts killing processes. Either:
> 
> 1. some other process is using the available memory 
> 2. Increase memory allocation on your machine/virtual machine/container/cgroup
> 3. Decrease the heap size of Flink’s JVM or non heap size (decrease network 
> memory buffer pool). Of course for any given job/state 
> size/configuration/cluster size there is some minimal reasonable memory size 
> that you have to assign to Flink, otherwise you will have poor performance 
> and/or constant garbage collections and/or you will start getting OOM errors 
> from JVM (don’t confuse those with OS/kernel's OOM errors - those two are on 
> a different level).
> 
> Piotrek
> 
> 
>> On 14 Aug 2018, at 07:36, Shailesh Jain > > wrote:
>> 
>> Hi Piotrek,
>> 
>> Thanks for your reply. I checked through the syslogs for that time, and I 
>> see this:
>> 
>> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill 
>> process 2305 (java) score 468 or sacrifice child
>> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305 
>> (java) total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
>> 
>> As you pointed out, kernel killed the task manager process.
>> 
>> If I had already set the max heap size for the JVM (to 3GB in this case), 
>> and the memory usage stats showed 2329MB being used 90 seconds earlier, it 
>> seems a bit unlikely for operators to consume 700 MB heap space in that 
>> short time, because our events ingestion rate is not that high (close to 10 
>> events per minute).
>> 
>> 2018-08-08 13:19:23,341 INFO  
>> org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage 
>> stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB 
>> (used/committed/max)]
>> 
>> Is it possible to log individual operator's memory consumption? This would 
>> help in narrowing down on the root cause. There were around 50 operators 
>> running (~8 kafka source/sink, ~8 Window operators, and the rest CEP 
>> operators).
>> 
>> Thanks,
>> Shailesh
>> 
>> On Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski > > wrote:
>> Hi,
>> 
>> Please post full TaskManager logs, including stderr and stdout. (Have you 
>> checked the stderr/stdout for some messages?)
>> 
>> I could think of couple reasons:
>> 1. process segfault
>> 2. process killed by OS
>> 3. OS failure
>> 
>> 1. Should be visible by some message in stderr/stdout file and can be caused 
>> by for example JVM, RocksDB or some other native library/code bug. 
>> 2. Is your system maybe running out of memory? Kernel might kill process if 
>> that’s happening. You can also check system (linux?) logs for errors that 
>> correlate in time. Where are those logs depend on your OS. 
>> 3. This might be tricky, but I have seen kernel failures that prevented any 
>> messages from being logged for example. Besides this TaskManager failure is 
>> your machine operating normally without any other problems/crashes/restarts?
>> 
>> Piotrek
>> 
>>> On 10 Aug 2018, at 06:59, Shailesh Jain >> > wrote:
>>> 
>>> Hi,
>>> 
>>> I hit a similar issue yesterday, the task manager died suspiciously, no 
>>> error logs in the task manager logs, but I see the following exceptions in 
>>> the job manager logs:
>>> 
>>> 2018-08-05 18:03:28,322 ERROR akka.remote.Remoting  
>>> - Association to [akka.tcp://flink@localhost:34483 <>] with 
>>> UID [328996232] irrecoverably failed. Quarantining address.
>>> java.util.concurrent.TimeoutException: Remote system has been silent for 
>>> too long. (more than 48.0 hours)

Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

2018-08-16 Thread Piotr Nowojski
Hi,

You made a small mistake when restoring from state using test harness, that I 
myself have also done in the past. Problem is with an ordering of those calls:

result.open();
if (savedState != null) {
result.initializeState(savedState);
}

Open is supposed to be called after initializeState, and if you look into the 
code of AbstractStreamOperatorTestHarness#open, if it is called before 
initialize, it will initialize harness without any state.

Unfortunate is that this is implicit behaviour that doesn’t throw any error 
(test harness is not part of a Flink’s public api). I will try to fix this: 
https://issues.apache.org/jira/browse/FLINK-10159 


Piotrek

> On 16 Aug 2018, at 00:24, Ken Krugler  wrote:
> 
> Hi all,
> 
> It looks to me like the OperatorSubtaskState returned from 
> OneInputStreamOperatorTestHarness.snapshot fails to include any timers that 
> had been registered via registerProcessingTimeTimer but had not yet fired 
> when the snapshot was saved.
> 
> Is this a known limitation of OneInputStreamOperatorTestHarness?
> 
> If not, is there anything special I need to do when setting up the test 
> harness to ensure that timers are saved?
> 
> Below is the unit test, which shows how the test harness is being set up and 
> run.
> 
> The TimerFunction used in this test does seem to be doing the right thing, as 
> using it in a simple job on a local Flink cluster works as expected when 
> creating & then restarting from a savepoint.
> 
> Thanks,
> 
> — Ken
> 
> ==
> TimerTest.java
> ==
> package com.scaleunlimited.flinkcrawler.functions;
> 
> import static org.junit.Assert.assertTrue;
> 
> import java.util.ArrayList;
> import java.util.List;
> 
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
> import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
> import org.junit.Before;
> import org.junit.Test;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> import com.scaleunlimited.flinkcrawler.tools.TimerTool;
> 
> public class TimerTest {
> public static final Logger LOGGER = 
> LoggerFactory.getLogger(TimerTest.class);
> 
> private List _firedTimers = new ArrayList();
> 
> @Before
> public void setUp() throws Exception {
> }
> 
> @Test
> public void testTimerSaving() throws Throwable {
> 
> // This operator doesn't really do much at all, but the first element
> // it processes will create a timer for (timestamp+1).
> // Whenever that timer fires, it will create another timer for 
> // (timestamp+1).
> KeyedProcessOperator operator = 
> new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
> 
> // Create a test harness from scratch
> OneInputStreamOperatorTestHarness testHarness = 
> makeTestHarness(operator, null);
> 
> // We begin at time zero
> testHarness.setProcessingTime(0);
> 
> // Process some elements, which should also create a timer for time 1.
> int inputs[] = new int[] {1, 2, 3};
> for (int input : inputs) {
> testHarness.processElement(new StreamRecord<>(input));
> }
> 
> // Force some time to pass, which should keep moving the timer ahead,
> // finally leaving it set for time 10.
> for (long i = 1; i < 10; i++) {
> testHarness.setProcessingTime(i);
> }
> 
> // Save the state, which we assume should include the timer we set for
> // time 10.
> OperatorSubtaskState savedState = 
> testHarness.snapshot(0L, testHarness.getProcessingTime());
> 
> // Close the first test harness
> testHarness.close();
> 
> // Create a new test harness using the saved state (which we assume
> // includes the timer for time 10).
> testHarness = makeTestHarness(operator, savedState);
> 
> // Force more time to pass, which should keep moving the timer ahead.
> for (long i = 10; i < 20; i++) {
> testHarness.setProcessingTime(i);
> }
> 
> // Close the second test harness and make sure all the timers we 
> expect
> // actually fired.
> testHarness.close();
> for (long i = 1; i < 20; i++) {
> 
> // TODO This expectation currently fails, since Timers 

Re: Standalone cluster instability

2018-08-16 Thread Shailesh Jain
Thank you for your help Piotrek.

I think it was a combination of a. other processes taking up available
memory and b. flink processes consuming all the memory allocated to them,
that resulted in kernel running out of memory.

Are there any heuristics or best practices which you (or anyone in the
community) recommend to benchmark memory requirements of a particular flink
job?

Thanks,
Shailesh


On Tue, Aug 14, 2018 at 6:08 PM, Piotr Nowojski 
wrote:

> Hi,
>
> Good that we are more or less on track with this problem :) But the
> problem here is not that heap size is too small, bot that your kernel is
> running out of memory and starts killing processes. Either:
>
> 1. some other process is using the available memory
> 2. Increase memory allocation on your machine/virtual
> machine/container/cgroup
> 3. Decrease the heap size of Flink’s JVM or non heap size (decrease
> network memory buffer pool). Of course for any given job/state
> size/configuration/cluster size there is some minimal reasonable memory
> size that you have to assign to Flink, otherwise you will have poor
> performance and/or constant garbage collections and/or you will start
> getting OOM errors from JVM (don’t confuse those with OS/kernel's OOM
> errors - those two are on a different level).
>
> Piotrek
>
>
> On 14 Aug 2018, at 07:36, Shailesh Jain 
> wrote:
>
> Hi Piotrek,
>
> Thanks for your reply. I checked through the syslogs for that time, and I
> see this:
>
> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill
> process 2305 (java) score 468 or sacrifice child
> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305
> (java) total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
>
> As you pointed out, kernel killed the task manager process.
>
> If I had already set the max heap size for the JVM (to 3GB in this case),
> and the memory usage stats showed 2329MB being used 90 seconds earlier, it
> seems a bit unlikely for operators to consume 700 MB heap space in that
> short time, because our events ingestion rate is not that high (close to 10
> events per minute).
>
> 2018-08-08 13:19:23,341 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  - Memory usage stats: [HEAP:
> 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
>
> Is it possible to log individual operator's memory consumption? This would
> help in narrowing down on the root cause. There were around 50 operators
> running (~8 kafka source/sink, ~8 Window operators, and the rest CEP
> operators).
>
> Thanks,
> Shailesh
>
> On Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Please post full TaskManager logs, including stderr and stdout. (Have you
>> checked the stderr/stdout for some messages?)
>>
>> I could think of couple reasons:
>> 1. process segfault
>> 2. process killed by OS
>> 3. OS failure
>>
>> 1. Should be visible by some message in stderr/stdout file and can be
>> caused by for example JVM, RocksDB or some other native library/code bug.
>> 2. Is your system maybe running out of memory? Kernel might kill process
>> if that’s happening. You can also check system (linux?) logs for errors
>> that correlate in time. Where are those logs depend on your OS.
>> 3. This might be tricky, but I have seen kernel failures that prevented
>> any messages from being logged for example. Besides this TaskManager
>> failure is your machine operating normally without any other
>> problems/crashes/restarts?
>>
>> Piotrek
>>
>> On 10 Aug 2018, at 06:59, Shailesh Jain 
>> wrote:
>>
>> Hi,
>>
>> I hit a similar issue yesterday, the task manager died suspiciously, no
>> error logs in the task manager logs, but I see the following exceptions in
>> the job manager logs:
>>
>> 2018-08-05 18:03:28,322 ERROR akka.remote.Remoting
>> - Association to [
>> akka.tcp://flink@localhost:34483] with UID [328996232] irrecoverably
>> failed. Quarantining address.
>> java.util.concurrent.TimeoutException: Remote system has been silent for
>> too long. (more than 48.0 hours)
>> at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.
>> applyOrElse(Endpoint.scala:375)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.remote.ReliableDeliverySupervisor.aroundReceive(
>> Endpoint.scala:203)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
>> java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>> ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>> at 

Re: Flink SQL does not support rename after cast type

2018-08-16 Thread 徐涛
Hi Hequn,
Thanks a lot for your anwswer! The question is clear now.

Best
Henry

> 在 2018年8月14日,下午1:24,Hequn Cheng  > 写道:
> 
> Hi Henry,
> 
> Flink does support rename column after casting.
> 
> The exception is not caused by cast. It is caused by mixing of types, for 
> example,  the query 
> "CASE 1 WHEN 1 THEN true WHEN 2 THEN 'string' ELSE NULL END"
> will throw the same exception since type of true and 'string' are not same.
> 
> Best, Hequn.
> 
> On Tue, Aug 14, 2018 at 12:51 PM, 徐涛  > wrote:
> Hi All,
> I am working on a project based on Flink SQL, but found that I can`t 
> rename column after casting, the code is as below:
> cast(json_type as INTEGER) as xxx
> 
> And the following exception is reported:
> org.apache.calcite.runtime.CalciteContextException: From line 4, 
> column 6 to line 11, column 38: Illegal mixing of types in CASE or COALESCE 
> statement
> 
> I want to know that does Flink do not support this function because I 
> think it is a common case? Is there a way to accomplish this function?
> Thank a lot.
> 
> Best
> Henry Xu
> 



Re: Scala 2.12 Support

2018-08-16 Thread Piotr Nowojski
Hi,

Scala 2.12 support is high on our priority list and we hope to have it included 
for the 1.7 release (as you can see in the ticket itself), which should happen 
later this year.

Piotrek

> On 15 Aug 2018, at 17:59, Aaron Levin  wrote:
> 
> Hello!
> 
> I'm wondering if there is anywhere I can see Flink's roadmap for Scala 2.12 
> support. The last email I can find on the list for this was back in January, 
> and the FLINK-7811[0], the ticket asking for Scala 2.12 support, hasn't been 
> updated in a few months.
> 
> Recently Spark fixed the ClosureCleaner code to support Scala 2.12[1], and 
> from what I can gather this was one of the main barrier for Flink supporting 
> Scala 2.12. Given this has been fixed, is there work in progress to support 
> Scala 2.12? Any updates on FLINK-7811?
> 
> Thanks for your help!
> 
> [0] https://issues.apache.org/jira/browse/FLINK-7811 
> 
> [1] https://issues.apache.org/jira/browse/SPARK-14540 
> 
> 
> Best,
> 
> Aaron Levin



Re: How to submit flink job on yarn by java code

2018-08-16 Thread spoon_lz
Sorry, I don't know why the code and error are not visible.
The error is :
 The program finished with the following exception:

/org.apache.flink.client.deployment.ClusterDeploymentException: Could not
deploy Yarn job cluster.
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at flink.SubmitDemo.submit(SubmitDemo.java:75)
at flink.SubmitDemo.main(SubmitDemo.java:50)
Caused by:
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment. 
Diagnostics from YARN: Application application_1526888270443_0090 failed 2
times due to AM Container for appattempt_1526888270443_0090_02 exited
with  exitCode: -1000
For more detailed output, check application tracking
page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then,
click on links to logs of each attempt.
Diagnostics: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
java.io.FileNotFoundException: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further
investigate the issue:
yarn logs -applicationId application_1526888270443_0090
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
... 5 more/

and my code like :

/public class SubmitDemo {


private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf";
private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf";
private static final String JAR_FILE =
"/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar";


public static void main(String[] args) {

SubmitDemo demo = new SubmitDemo();
demo.before();
List parameters = new ArrayList<>();
parameters.add("run");
parameters.add("-d");
parameters.add("-m");
parameters.add("yarn-cluster");
parameters.add("-ynm");
parameters.add("lz_test_alone");
parameters.add("-yn");
parameters.add("4");
parameters.add("-ytm");
parameters.add("4096");
parameters.add("-yjm");
parameters.add("1024");
parameters.add("-c");
parameters.add("flink.Demo");
parameters.add(JAR_FILE);

try {
demo.submit(parameters.toArray(new String[parameters.size()]));
} catch (Exception e) {
e.printStackTrace();
}
}

public void submit(String[] args) throws Exception {

final String configurationDirectory = ENV_CONF;

File configFIle = new File(FLINK_CONF);

final Configuration flinkConfiguration =

Re: Flink CLI does not return after submitting yarn job in detached mode

2018-08-16 Thread vino yang
Hi Marvin777,

You are wrong. It uses the Flink on YARN single job mode and should use the
"-yd" parameter.

Hi Madhav,

I seem to have found the problem, the source code of your log is here.[1]

It is based on a judgment method "isUsingInteractiveMode".

The source code for this method is here[2], returning true when "program"
is null. And when is this field null? it's here.[3]

So, from the source code point of view, I suggest you explicitly specify
the class in which the Main method is located in the CLI args.



[1]:
https://github.com/apache/flink/blob/release-1.4.2/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L380

[2]:
https://github.com/apache/flink/blob/release-1.4.2/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java#L276

[3]:
https://github.com/apache/flink/blob/release-1.4.2/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java#L217

Thanks, vino.

Marvin777  于2018年8月16日周四 上午11:00写道:

> Hi, Madhav,
>
>
>> ./flink-1.4.2/bin/flink run -m yarn-cluster *-yd* -yn 2 -yqu "default"
>>  -ytm 2048 myjar.jar
>
>
> Modified to, ./flink-1.4.2/bin/flink run -m yarn-cluster -*d* -yn 2 -yqu
> "default"  -ytm 2048 myjar.jar
>
>
>
> [image: image.png]
>
> madhav Kelkar  于2018年8月16日周四 上午5:01写道:
>
>> Hi there,
>>
>> I am trying to run a single flink job on YARN in detached mode. as
>> per the docs for flink 1.4.2, I am using -yd to do that.
>>
>> The problem I am having is the flink bash script doesn't terminate
>> execution and return until I press control + c. In detached mode, I would
>> expect the flink CLI to return as soon as yarn job is submitted. is there
>> something I am missing? here is exact output I get -
>>
>>
>>
>> ./flink-1.4.2/bin/flink run -m yarn-cluster -yd -yn 2 -yqu "default"
>>>  -ytm 2048 myjar.jar \
>>> program arguments omitted
>>>
>>>
>>> Using the result of 'hadoop classpath' to augment the Hadoop classpath:
>>> /Users/makelkar/work/hadoop-2.7.3/etc/hadoop:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/common/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/common/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/yarn/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/yarn/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/mapreduce/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/mapreduce/*:/Users/makelkar/work/hadoop-2.7.3/contrib/capacity-scheduler/*.jar
>>> 2018-08-15 14:39:36,873 INFO
>>>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
>>> for the flink jar passed. Using the location of class
>>> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>>> 2018-08-15 14:39:36,873 INFO
>>>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
>>> for the flink jar passed. Using the location of class
>>> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>>> 2018-08-15 14:39:36,921 INFO  org.apache.hadoop.yarn.client.RMProxy
>>> - Connecting to ResourceManager at /0.0.0.0:8032
>>> 2018-08-15 14:39:37,226 INFO
>>>  org.apache.flink.yarn.YarnClusterDescriptor   - Cluster
>>> specification: ClusterSpecification{masterMemoryMB=1024,
>>> taskManagerMemoryMB=2048, numberTaskManagers=2, slotsPerTaskManager=1}
>>> 2018-08-15 14:39:37,651 WARN
>>>  org.apache.flink.yarn.YarnClusterDescriptor   - The
>>> configuration directory ('/Users/makelkar/work/flink/flink-1.4.2/conf')
>>> contains both LOG4J and Logback configuration files. Please delete or
>>> rename one of them.
>>> 2018-08-15 14:39:37,660 INFO  org.apache.flink.yarn.Utils
>>> - Copying from
>>> file:/Users/makelkar/work/flink/flink-1.4.2/conf/logback.xml to
>>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/logback.xml
>>>
>>> 2018-08-15 14:39:37,986 INFO  org.apache.flink.yarn.Utils
>>> - Copying from
>>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/log4j-1.2.17.jar to
>>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/log4j-1.2.17.jar
>>> 2018-08-15 14:39:38,011 INFO  org.apache.flink.yarn.Utils
>>> - Copying from
>>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar
>>> to
>>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/flink-dist_2.11-1.4.2.jar
>>> 2018-08-15 14:39:38,586 INFO  org.apache.flink.yarn.Utils
>>> - Copying from
>>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar
>>> to
>>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/flink-python_2.11-1.4.2.jar
>>> 2018-08-15 14:39:38,603 INFO  org.apache.flink.yarn.Utils
>>> - Copying from
>>> 

How to compare two window ?

2018-08-16 Thread 苗元君
Hi, Flink guys,
U really to a quick release, it's fantastic !

I'v got a situation ,
window 1 is time driven, slice is 1min, trigger is 1 count
window 2 is count driven, slice is 3 count, trigger is 1count

1. Then element is out of window1 and just right into window2.
For example if there is only 2 element, window2 will have none element. *
how to build window like this ? *
   I try to use window1 by structure (window trigger evictor) then window2
structure(trigger evictor)
   I got element calculate just in window1 and window2 in the same time

2.  I try to find ways to use SQL on AllWindowedStream but seem not
working. Can SQL Query use on a WINDOW ?
3.  How to compare these SQL result ?


[image: image.png]

Thank U so much.

-- 

*Yuanjun Miao*