Re: Flink 1.11.2 test cases fail with Scala 2.12.12

2021-02-23 Thread Chesnay Schepler

Coud you check your dependency tree for the version of scala-library?

On 2/24/2021 7:28 AM, soumoks wrote:

Thank you for the response but this error continues to happen with Scala
2.12.7.
The app itself continues to compile without errors but the test cases fail
with the same error.

Seems to be related to
https://issues.apache.org/jira/browse/FLINK-12461

I have set the Scala version in pom.xml file and I have used this property
value for all dependencies present.

 
 1.8
 1.8
 UTF-8
 2.12.7
 2.12
 1.11.461
 4.2.0
 

  



However, I ran into the same error

org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9
   at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
   at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
   at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
   at
org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
   at
org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
   at
org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)




and it occurs if I use a Scala mutable/immutable map data-structure in my
code.

Sample test code:

val data: scala.collection.mutable.Map[String, String] =
   scala.collection.mutable.Map("key1" -> "v1", "key2" -> "v2")

 val kafkaMsg: String = write(
   SampleCaseClass(14L, "23FC", 10L, data)
 )

 val stream: DataStream[SampleCaseClass] =
   env.addSource(new MockKafkaSource(kafkaMsg)).
flatMap(new SampleCaseClassMapper)


I am including part of pom file related to test packages for reference.

 
 org.scalatest
 scalatest_${scala.compat.version}
 3.2.3
 test
 

 
 src/main/scala
 src/test/scala
 
 
 
 net.alchim31.maven

 scala-maven-plugin
 4.4.0
 
 
 
 compile
 testCompile
 
 
 
 -dependencyfile

${project.build.directory}/.scala_dependencies

 
 
 
 
 
 
 org.apache.maven.plugins
 maven-surefire-plugin
 2.22.2
 
 
 true

 
 

 
 org.scalatest
 scalatest-maven-plugin
 2.0.2
 

${project.build.directory}/surefire-reports

 .
 TestSuiteReport.txt
 
 
 

 
 
 test
 
 test
 
 
 
 
 
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Window Process function is not getting trigger

2021-02-23 Thread sagar
It is fairly simple requirement, if I changed it to PRocessing time it
works fine , but not working with event time..help appreciated!

On Wed, Feb 24, 2021 at 10:51 AM sagar  wrote:

> HI
>
> Corrected with below code, but still getting same issue
>
> Instant instant = 
> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
> long timeInMillis = instant.toEpochMilli();
> System.out.println(timeInMillis);
> return timeInMillis;
>
>
> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:
>
>> I saw one potential issue. Your timestamp assigner returns timestamp in
>> second resolution while Flink requires millisecond resolution.
>>
>>
>> Best,
>> Kezhu Wang
>>
>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>
>> I have simple flink stream program, where I am using socket as my
>> continuous source
>> I have window size of 2 seconds.
>>
>> Somehow my window process function is not triggering and even if I pass
>> events in any order, flink is not ignoring
>>
>> I can see the output only when I kill my socket , please find the code
>> snippet below
>>
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>>
>> DataStream price = env.socketTextStream("localhost",
>> 9998).uid("price source").map(new MapFunction() {
>> @Override
>> public Price map(String s) throws Exception {
>> return new Price(s.split(",")[0],
>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>> }
>> }
>> );
>>
>> DataStream priceStream = price
>>
>>  
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
>> .withTimestampAssigner((p,timestamp) ->
>> {
>> ZoneId zoneId = ZoneId.systemDefault();
>> long epoch =
>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>> System.out.println(epoch);
>>  return epoch;
>> }))
>> .keyBy(new KeySelector() {
>> @Override
>> public String getKey(Price price) throws Exception {
>> return price.getPerformanceId();
>> }
>> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>> .process(new ProcessWindowFunction> TimeWindow>() {
>>
>> @Override
>> public void process(String s, Context context,
>> Iterable iterable, Collector collector) throws Exception {
>> System.out.println(context.window().getStart()+
>> "Current watermark: "+context.window().getEnd());
>> Price p1 = null ;
>> for(Price p : iterable)
>> {
>> System.out.println(p.toString());
>> p1= p;
>> }
>> collector.collect(p1);
>> }
>> });
>>
>>
>> priceStream.writeAsText("c:\\ab.txt");
>>
>> also data I am inputting are
>>
>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant please ignore this email.
>>
>>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>


-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: WatermarkStrategy.for_bounded_out_of_orderness in table API

2021-02-23 Thread Roman Khachatryan
The watermark resolution in Flink is one millisecond [1], so the 1st form
essentially doesn't allow out-of-orderness (though the elements with the
same timestamp are not considered late in this case).

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html

Regards,
Roman


On Wed, Feb 24, 2021 at 7:54 AM joris.vanagtmaal <
joris.vanagtm...@wartsila.com> wrote:

> Thanks Roman,
>
> somehow i must have missed this in the documentation.
>
> What is the difference (if any) between:
>
> Ascending timestamps:
> WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND.
>
> Bounded out of orderness timestamps:
> WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string'
> timeUnit.
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: 通过普通ddl来读写hive

2021-02-23 Thread Rui Li
这个取决于你们自己的元数据管理系统了,Flink这边实现Catalog的各个接口对接你们的系统就行。比如在Catalog::createTable的实现里可以增加鉴权机制,判断是否允许用户建表之类的。

On Wed, Feb 24, 2021 at 11:14 AM silence  wrote:

> 那用自定义的catalog怎么定义hive表来读写hive呢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


Re: Datastream Lag Windowing function

2021-02-23 Thread Roman Khachatryan
Hi,

I can't see neither wrong nor expected output in your message, can you
re-attach it?
Could you provide the code of your pipeline including the view creation?
Are you using Blink planner (can be chosen by useBlinkPlanner [1])?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#main-differences-between-the-two-planners

Regards,
Roman


On Sun, Feb 21, 2021 at 9:40 AM s_penakalap...@yahoo.com <
s_penakalap...@yahoo.com> wrote:

> Hi All,
>
> I am using Flink1.12, I am trying to read realtime data from Kafka topic
> and as per the requirement I need to implement windowing LAG function.
> Approach I followed is below:
>
> DataStream vData = env.addSource(...)
> vData.keyBy(Id)
> createTemperoryView
> then apply flink sql.
>
> My sample data is like below, vTime field contains the timestamp when the
> even was generated and vNumSeq is the unique number for particular group Id.
>
> I tried Lag function by ordering by vSeq field (long datatype), Job failed
> with "OVER windows' ordering in stream mode must be defined on a time
> attribute".
>
> I even tried by using vTime field (eventTS is also long datatype). I tried
> converting this field to sql.Timestamp, still no luck Job failed with above
> error.
>
> When I referred few documents solution provided was to use
> proctime/rowtime. So I modified the query to use proctime() Job succeeded
> but with wrong results.
>
> Kindly help with simple example badly stuck. I am ok to use even
> Datastream API to implement lag functionality.
>
> Lag Query:
> select vdata.f0 as id, vdata.f1 as name, vdata.f2 as vTime, vdata.f3 as
> vSeq, vdata.f4 as currentSal, LAG(vdata.f4,1,0) OVER ( partition BY
> vdata.f0 ORDER BY proctime()) AS prevSal from VData vData
>
> Wrong output :
>
>
> Expected:
>
>
> Regards,
> Sunitha.
>
>


Re: BroadcastState dropped when data deleted in Kafka

2021-02-23 Thread bat man
Hi,

This is my code below -
As mentioned earlier the rulesStream us again used in later processing.
Below you can see the rulesStream is again connected with the result stream
of the first process stream. Do you think this is the reason rules
operators state getting overridden when the data in kafka is deleted?
My question is if the data is not present in kafka then no data is read in
stream how it is updating the existing state data.

public static final MapStateDescriptor rulesDescriptor =
new MapStateDescriptor<>(
"rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));

KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
DataStream rawEventStream =
validateData(getRawEventStream(rawEventKafkaSource,env));

 rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
 DataStream rulesDataStream = getRulesStream(rulesKafkaSource,env);

 deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
 DataStream deviceDataStream = getDeviceStream(deviceSource,env);

 BroadcastStream rulesStream = rulesDataStream.broadcast(rulesDescriptor);

 SingleOutputStreamOperator>
keyedSingleOutputStream =
 rawEventStream.
 connect(rulesStream).
 process(new
DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);

 SingleOutputStreamOperator rtEventDataStream =
 keyedSingleOutputStream.
 keyBy((keyed) -> keyed.getKey()).
 connect(rulesStream).
 process(new
DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);


On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi,
>
> Deletion of messages in Kafka shouldn't affect Flink state in general.
> Probably, some operator in your pipeline is re-reading the topic
> and overwrites the state, dropping what was deleted by Kafka.
> Could you share the code?
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:
>
>> Hi,
>>
>> I have 2 streams one event data and the other rules. I broadcast the
>> rules stream and then key the data stream on event type. The connected
>> stream is processed thereafter.
>> We faced an issue where the rules data in the topic got deleted because
>> of Kafka retention policy.
>> Post this the existing rules data also got dropped in the broadcast state
>> and the processing stopped.
>>
>> As per my understanding the rules which were present in broadcast state
>> should still exist even if the data was deleted in Kafka as the rules dats
>> was already processed and stored in state map.
>>
>> PS: I’m reusing the rules stream as broadcast later in processing as
>> well. Could this be an issue?
>>
>> Thanks,
>> Hemant
>>
>


Re: Flink 1.11.2 test cases fail with Scala 2.12.12

2021-02-23 Thread soumoks
Thank you for the response but this error continues to happen with Scala
2.12.7.
The app itself continues to compile without errors but the test cases fail
with the same error.

Seems to be related to 
https://issues.apache.org/jira/browse/FLINK-12461

I have set the Scala version in pom.xml file and I have used this property
value for all dependencies present.


1.8
1.8
UTF-8
2.12.7
2.12
1.11.461
4.2.0


 


However, I ran into the same error 

org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9
  at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
  at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
  at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
  at
org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
  at
org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
  at
org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)




and it occurs if I use a Scala mutable/immutable map data-structure in my
code.

Sample test code:

val data: scala.collection.mutable.Map[String, String] =
  scala.collection.mutable.Map("key1" -> "v1", "key2" -> "v2")

val kafkaMsg: String = write(
  SampleCaseClass(14L, "23FC", 10L, data)
)

val stream: DataStream[SampleCaseClass] =
  env.addSource(new MockKafkaSource(kafkaMsg)).
flatMap(new SampleCaseClassMapper)


I am including part of pom file related to test packages for reference.


org.scalatest
scalatest_${scala.compat.version}
3.2.3
test



src/main/scala
src/test/scala



net.alchim31.maven
scala-maven-plugin
4.4.0



compile
testCompile



-dependencyfile
   
${project.build.directory}/.scala_dependencies






org.apache.maven.plugins
maven-surefire-plugin
2.22.2


true




org.scalatest
scalatest-maven-plugin
2.0.2

   
${project.build.directory}/surefire-reports
.
TestSuiteReport.txt





test

test









--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink custom trigger use case

2021-02-23 Thread Khachatryan Roman
Hi Diwakar,

I'm not sure I fully understand your question.
If event handling in one window depends on some other windows than
TriggerContext.getPartitionedState can not be used. Triggers don't have
access to the global state (only to key-window scoped state).
If that's what you want then please consider ProcessWindowFunction [1]
where you can use context.globalState() in your process function.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction

Regards,
Roman


On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha  wrote:

>
> Hello,
>
> I'm trying to use a custom trigger for one of my use case. I have a basic
> logic (as shown below) of using keyBy on the input stream and using a
> window of 1 min.
>
> .keyBy()
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .trigger(new CustomTrigger())
> .aggregate(Input.getAggregationFunction(), new
> AggregationProcessingWindow());
>
>
> My custom trigger is expected to fire the first event of the keyBy
> instantly and any subsequent events should be aggregated in the window.
>
> .trigger(new Trigger() {
>> @Override
>> public TriggerResult onElement(Record record, long l, TimeWindow
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> ValueState firstSeen =
>> triggerContext.getPartitionedState(firstSceenDescriptor);
>> if(firstSeen.value() == null) {
>> firstSeen.update(true);
>> // fire trigger to early evaluate window and purge that event.
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> // Continue. Do not evaluate window per element
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> // final evaluation and purge window state
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> @Override
>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
>> throws Exception {
>>
>> }
>> })
>
>
>
>
> Currently, I see (for each window and same key) the first event of the
> window is always fired. But I want to see this happening for only the first
> window and for the subsequent window it should aggregate all the events and
> then fire.
>
> Example : all the records have the same key.
> current output.
> record 1 : first event in the window-1 : fired record 2 : last event in
> the window-1 : fired record 3 : first event in the window-2 : fired record
> 4, record 5 : - 2 events in the window-2 : fired.
>
> expected output.
> record 1 : first event in the window-1 : fired record 2 : last event in
> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
> window-2 should not fire the first event of the same key.
>
> I'm reading it here
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
> but not able to solve it. Any pointers would be helpful.
>
> Thanks.
>


Re: Window Process function is not getting trigger

2021-02-23 Thread sagar
HI

Corrected with below code, but still getting same issue

Instant instant =
p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
long timeInMillis = instant.toEpochMilli();
System.out.println(timeInMillis);
return timeInMillis;


On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:

> I saw one potential issue. Your timestamp assigner returns timestamp in
> second resolution while Flink requires millisecond resolution.
>
>
> Best,
> Kezhu Wang
>
> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>
> I have simple flink stream program, where I am using socket as my
> continuous source
> I have window size of 2 seconds.
>
> Somehow my window process function is not triggering and even if I pass
> events in any order, flink is not ignoring
>
> I can see the output only when I kill my socket , please find the code
> snippet below
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
>
> DataStream price = env.socketTextStream("localhost",
> 9998).uid("price source").map(new MapFunction() {
> @Override
> public Price map(String s) throws Exception {
> return new Price(s.split(",")[0],
> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
> BigDecimal(s.split(",")[3]), s.split(",")[4], new
> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
> }
> }
> );
>
> DataStream priceStream = price
>
>  
> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
> .withTimestampAssigner((p,timestamp) ->
> {
> ZoneId zoneId = ZoneId.systemDefault();
> long epoch =
> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
> System.out.println(epoch);
>  return epoch;
> }))
> .keyBy(new KeySelector() {
> @Override
> public String getKey(Price price) throws Exception {
> return price.getPerformanceId();
> }
> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
> .process(new ProcessWindowFunction TimeWindow>() {
>
> @Override
> public void process(String s, Context context,
> Iterable iterable, Collector collector) throws Exception {
> System.out.println(context.window().getStart()+
> "Current watermark: "+context.window().getEnd());
> Price p1 = null ;
> for(Price p : iterable)
> {
> System.out.println(p.toString());
> p1= p;
> }
> collector.collect(p1);
> }
> });
>
>
> priceStream.writeAsText("c:\\ab.txt");
>
> also data I am inputting are
>
> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>
>

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: Window Process function is not getting trigger

2021-02-23 Thread Kezhu Wang
I saw one potential issue. Your timestamp assigner returns timestamp in
second resolution while Flink requires millisecond resolution.


Best,
Kezhu Wang

On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:

I have simple flink stream program, where I am using socket as my
continuous source
I have window size of 2 seconds.

Somehow my window process function is not triggering and even if I pass
events in any order, flink is not ignoring

I can see the output only when I kill my socket , please find the code
snippet below

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);


DataStream price = env.socketTextStream("localhost",
9998).uid("price source").map(new MapFunction() {
@Override
public Price map(String s) throws Exception {
return new Price(s.split(",")[0],
LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
BigDecimal(s.split(",")[3]), s.split(",")[4], new
BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
}
}
);

DataStream priceStream = price

 
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner((p,timestamp) ->
{
ZoneId zoneId = ZoneId.systemDefault();
long epoch = p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
System.out.println(epoch);
 return epoch;
}))
.keyBy(new KeySelector() {
@Override
public String getKey(Price price) throws Exception {
return price.getPerformanceId();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(2)))
.process(new ProcessWindowFunction() {

@Override
public void process(String s, Context context,
Iterable iterable, Collector collector) throws Exception {
System.out.println(context.window().getStart()+
"Current watermark: "+context.window().getEnd());
Price p1 = null ;
for(Price p : iterable)
{
System.out.println(p.toString());
p1= p;
}
collector.collect(p1);
}
});


priceStream.writeAsText("c:\\ab.txt");

also data I am inputting are

p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Window Process function is not getting trigger

2021-02-23 Thread sagar
I have simple flink stream program, where I am using socket as my
continuous source
I have window size of 2 seconds.

Somehow my window process function is not triggering and even if I pass
events in any order, flink is not ignoring

I can see the output only when I kill my socket , please find the code
snippet below

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);


DataStream price = env.socketTextStream("localhost",
9998).uid("price source").map(new MapFunction() {
@Override
public Price map(String s) throws Exception {
return new Price(s.split(",")[0],
LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
BigDecimal(s.split(",")[3]), s.split(",")[4], new
BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
}
}
);

DataStream priceStream = price

 
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner((p,timestamp) ->
{
ZoneId zoneId = ZoneId.systemDefault();
long epoch = p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
System.out.println(epoch);
 return epoch;
}))
.keyBy(new KeySelector() {
@Override
public String getKey(Price price) throws Exception {
return price.getPerformanceId();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(2)))
.process(new ProcessWindowFunction() {

@Override
public void process(String s, Context context,
Iterable iterable, Collector collector) throws Exception {
System.out.println(context.window().getStart()+
"Current watermark: "+context.window().getEnd());
Price p1 = null ;
for(Price p : iterable)
{
System.out.println(p.toString());
p1= p;
}
collector.collect(p1);
}
});


priceStream.writeAsText("c:\\ab.txt");

also data I am inputting are

p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Flink On Yarn Per Job 作业提交失败问题

2021-02-23 Thread 凌战
hi,社区
在接口端设置用户为 hdfs 用户,在调度执行作业后,发现在/user/hdfs/.flink/application-id 目录下 存在相关包,如
-rw-r--r--   3 hdfs supergroup   9402 2021-02-24 11:02 
/user/hdfs/.flink/application_1610671284452_0257/WordCount.jar
-rw-r--r--   3 hdfs supergroup   1602 2021-02-24 11:09 
/user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_0257-flink-conf.yaml7449295579763306480.tmp
-rw-r--r--   3 hdfs supergroup  32629 2021-02-24 11:09  
/user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_02573784840919107496826.tmp
-rw-r--r--   3 hdfs supergroup  110075001 2021-02-24 11:09 
/user/hdfs/.flink/application_1610671284452_0257/flink-dist_2.11-1.10.1.jar


但是报错 Could not find or load main class 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
发现上传文件目录的权限是  -rw-r--r-- ,不知道是不是因为权限问题导致


希望有人解惑!
| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制

Re: 通过普通ddl来读写hive

2021-02-23 Thread silence
那用自定义的catalog怎么定义hive表来读写hive呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Install/Run Streaming Anomaly Detection R package in Flink

2021-02-23 Thread Wei Zhong
Hi Robert,

If you do not want to install the library on every machine of the cluster, the 
Python dependency management API can be used to upload and use the required 
dependencies to cluster. 

For this case, I recommend building a portable python environment that contains 
all the required dependencies. You can call `add_python_archives` to upload the 
environment to your and call `set_python_executable` to set the path of the 
python interpreter in your cluster.

For more detailed information, you can refer to the following link.

Documentation of the Python dependency management API and configuration:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#python-dependency-in-python-program
 

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-archives
 


How to build a portable python environment:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/faq.html#preparing-python-virtual-environment
 


Best,
Wei

> 在 2021年2月24日,01:38,Roman Khachatryan  写道:
> 
> Hi,
> 
> I'm pulling in Wei Zhong and Xingbo Huang who know PyFlink better.
> 
> Regards,
> Roman
> 
> 
> On Mon, Feb 22, 2021 at 3:01 PM Robert Cullen  > wrote:
> My customer wants us to install this package in our Flink Cluster:
> 
> https://github.com/twitter/AnomalyDetection 
> 
> 
> One of our engineers developed a python version:
> 
> https://pypi.org/project/streaming-anomaly-detection/ 
> 
> 
> Is there a way to install this in our cluster?
> 
> -- 
> Robert Cullen
> 240-475-4490



flinksql1.11??????????????UDAF??????????No match found for function signature prod()

2021-02-23 Thread Presley
Hi,all:
flinksql1.11hiveUDAFUDAFbug??_(:??)_

-
public class Product0 {
public Double prod = 1D;
}

public class ProductUdaf0 extends AggregateFunction

Re: Does the Kafka source perform retractions on Key?

2021-02-23 Thread Rex Fenley
Apologies, forgot to finish. If the Kafka source performs its own
retractions of old data on key (user_id) for every append it receives, it
should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley  wrote:

> Hi,
>
> I'm concerned about the impacts of Kafka's compactions when sending data
> between running flink jobs.
>
> For example, one job produces retract stream records in sequence of
> (false, (user_id: 1, state: "california") -- retract
> (true, (user_id: 1, state: "ohio")) -- append
> Which is consumed by Kafka and keyed by user_id, this could end up
> compacting to just
> (true, (user_id: 1, state: "ohio")) -- append
> If some other downstream Flink job has a filter on state == "california"
> and reads from the Kafka stream, I assume it will miss the retract message
> altogether and produce incorrect results.
>
> Is this true? How do we prevent this from happening? We need to use
> compaction since all our jobs are based on CDC and we can't just drop data
> after x number of days.
>
> Thanks
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: 关于1.12新增的initialize阶段时间较长问题

2021-02-23 Thread yidan zhao
这个问题有人清楚吗。今天又是重启,5min了,还是initializing阶段,client部分直接报异常退出(报的Caused by:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2021年2月7日周日 下午4:00写道:

> 截图也没办法反应动态变化的过程。
>
> 目前是10机器的Standalone集群,状态在5G左右。通过flink-client端提交任务,然后web-ui刷新就一直转圈,过一会(几十秒大概)就OK啦,然后刚刚OK一瞬间会有很多个处于Initialize状态的任务,然后慢慢(10s内吧)没掉。
>
> flink-client端的话,有时候正常提交完成,有时候出现报错(类似说是重复任务的)。
>
>
> zilong xiao  于2021年2月7日周日 下午3:25写道:
>
>> 有截图吗?
>>
>> 赵一旦  于2021年2月7日周日 下午3:13写道:
>>
>> > 这个问题现在还有个现象,我提交任务,web
>> > UI就类似卡住状态,过一会刷新出来任务,会有4-5个initialize状态的任务,然后几秒之内陆续消失,剩下1个。
>> >
>> > 目前怀疑是有什么重试机制,导致重复提交N个任务,然后可能还有什么去重机制,然后其中几个陆续自动停掉?
>> >
>> > 赵一旦  于2021年1月26日周二 上午10:51写道:
>> >
>> > >
>> 如上,目前发现以前很快(10-30s)内能从敲命名到running的任务。现在有时候innitialize阶段就得1-2min。不清楚啥情况。
>> > >
>> >
>>
>


Does the Kafka source perform retractions on Key?

2021-02-23 Thread Rex Fenley
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data
between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up
compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california"
and reads from the Kafka stream, I assume it will miss the retract message
altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use
compaction since all our jobs are based on CDC and we can't just drop data
after x number of days.

Thanks

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



ValidationException: SQL validation failed.No match found for function signature count_uadf()

2021-02-23 Thread Presley
??dei??_(:??)_



 ?? 2020??12??179:00 <[hidden email] ??

 flink??1.11.1
 udaf
 
 ??
 public class TestSql {
  public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = 
FlinkUtils.getTableEnv(env);
//env.setParallelism(3);

tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class);

Properties configs = CommonUtils.getConfigs();
//clazz
FlinkUtils.registerMysqlTable2FlinkTable(

tableEnv,configs.getProperty("url"),

configs.getProperty("user.name"), configs.getProperty("password"),
??test", 
"clazz_lesson");

Table table = tableEnv.sqlQuery("select 
count_uadf(clazz_number),clazz_number from clazz_lesson group by clazz_number");
//Table table = tableEnv.sqlQuery("select 
number,collect(extension_value) from clazz_extension group by number ");
tableEnv.toRetractStream(table, 
Row.class).print();
env.execute();


  }
 }

 


 public class CountUdaf extends AggregateFunction

Flink jobs organization and maintainability

2021-02-23 Thread Sweta Kalakuntla
Hi,

I am going to have to implement many similar jobs. I need guidance and
examples that you may have for organizing them in the Git repository
without having to have one repo per job.

Thanks,
SK

--


Re: Flink custom trigger use case

2021-02-23 Thread Diwakar Jha
Hi Roman,

Thanks for your reply! That was a typo, i'm using
TumblingProcessingTimeWindows
My problem is that i want to stop the first event trigger (per key) except
for the first window. right now, my first event is getting triggered in
every window. Will setting  "*state (firstSeen) value is true, not just
exists" *is also going to change the result per window.

Thanks!

On Tue, Feb 23, 2021 at 12:05 PM Roman Khachatryan  wrote:

> Hi,
>
> I've noticed that you are using an event time window, but the trigger
> fires based on processing time.
> You should also register an event time timer (for the window end). So that
> trigger.onEventTime() will be called.
> And it's safer to check if the state (firstSeen) value is true, not just
> exists.
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha 
> wrote:
>
>>
>> Hello,
>>
>> I'm trying to use a custom trigger for one of my use case. I have a basic
>> logic (as shown below) of using keyBy on the input stream and using a
>> window of 1 min.
>>
>> .keyBy()
>> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>> .trigger(new CustomTrigger())
>> .aggregate(Input.getAggregationFunction(), new
>> AggregationProcessingWindow());
>>
>>
>> My custom trigger is expected to fire the first event of the keyBy
>> instantly and any subsequent events should be aggregated in the window.
>>
>> .trigger(new Trigger() {
>>> @Override
>>> public TriggerResult onElement(Record record, long l, TimeWindow
>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>> ValueState firstSeen =
>>> triggerContext.getPartitionedState(firstSceenDescriptor);
>>> if(firstSeen.value() == null) {
>>> firstSeen.update(true);
>>> // fire trigger to early evaluate window and purge that event.
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>> // Continue. Do not evaluate window per element
>>> return TriggerResult.CONTINUE;
>>> }
>>> @Override
>>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>>> TriggerContext triggerContext) throws Exception {
>>> // final evaluation and purge window state
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>> @Override
>>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>>> TriggerContext triggerContext) throws Exception {
>>> return TriggerResult.CONTINUE;
>>> }
>>> @Override
>>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
>>> throws Exception {
>>>
>>> }
>>> })
>>
>>
>>
>>
>> Currently, I see (for each window and same key) the first event of the
>> window is always fired. But I want to see this happening for only the first
>> window and for the subsequent window it should aggregate all the events and
>> then fire.
>>
>> Example : all the records have the same key.
>> current output.
>> record 1 : first event in the window-1 : fired record 2 : last event in
>> the window-1 : fired record 3 : first event in the window-2 : fired record
>> 4, record 5 : - 2 events in the window-2 : fired.
>>
>> expected output.
>> record 1 : first event in the window-1 : fired record 2 : last event in
>> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
>> window-2 should not fire the first event of the same key.
>>
>> I'm reading it here
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
>> but not able to solve it. Any pointers would be helpful.
>>
>> Thanks.
>>
>


Re: Flink custom trigger use case

2021-02-23 Thread Roman Khachatryan
Hi,

I've noticed that you are using an event time window, but the trigger fires
based on processing time.
You should also register an event time timer (for the window end). So that
trigger.onEventTime() will be called.
And it's safer to check if the state (firstSeen) value is true, not just
exists.

Regards,
Roman


On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha  wrote:

>
> Hello,
>
> I'm trying to use a custom trigger for one of my use case. I have a basic
> logic (as shown below) of using keyBy on the input stream and using a
> window of 1 min.
>
> .keyBy()
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .trigger(new CustomTrigger())
> .aggregate(Input.getAggregationFunction(), new
> AggregationProcessingWindow());
>
>
> My custom trigger is expected to fire the first event of the keyBy
> instantly and any subsequent events should be aggregated in the window.
>
> .trigger(new Trigger() {
>> @Override
>> public TriggerResult onElement(Record record, long l, TimeWindow
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> ValueState firstSeen =
>> triggerContext.getPartitionedState(firstSceenDescriptor);
>> if(firstSeen.value() == null) {
>> firstSeen.update(true);
>> // fire trigger to early evaluate window and purge that event.
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> // Continue. Do not evaluate window per element
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> // final evaluation and purge window state
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> @Override
>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
>> throws Exception {
>>
>> }
>> })
>
>
>
>
> Currently, I see (for each window and same key) the first event of the
> window is always fired. But I want to see this happening for only the first
> window and for the subsequent window it should aggregate all the events and
> then fire.
>
> Example : all the records have the same key.
> current output.
> record 1 : first event in the window-1 : fired record 2 : last event in
> the window-1 : fired record 3 : first event in the window-2 : fired record
> 4, record 5 : - 2 events in the window-2 : fired.
>
> expected output.
> record 1 : first event in the window-1 : fired record 2 : last event in
> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
> window-2 should not fire the first event of the same key.
>
> I'm reading it here
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
> but not able to solve it. Any pointers would be helpful.
>
> Thanks.
>


Re: WatermarkStrategy.for_bounded_out_of_orderness in table API

2021-02-23 Thread Roman Khachatryan
Hi,

You can use watermark strategy with bounded out of orderness in DDL, please
refer to [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#watermark

Regards,
Roman


On Tue, Feb 23, 2021 at 12:48 PM joris.vanagtmaal <
joris.vanagtm...@wartsila.com> wrote:

> I worked out the rowtype input for the conversion to datastream;
>
> type_info = Types.ROW_NAMED(['sender', 'stw', 'time'],[Types.STRING(),
> Types.DOUBLE(), Types.LONG()])
> datastream=table_env.to_append_stream(my_table, type_info)
>
> But if i try to assign rowtime and watermarks to the datastream and convert
> it back to a table, the rowtime/watermarks either get dropped or maybe they
> are not created properly.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Install/Run Streaming Anomaly Detection R package in Flink

2021-02-23 Thread Roman Khachatryan
Hi,

I'm pulling in Wei Zhong and Xingbo Huang who know PyFlink better.

Regards,
Roman


On Mon, Feb 22, 2021 at 3:01 PM Robert Cullen  wrote:

> My customer wants us to install this package in our Flink Cluster:
>
> https://github.com/twitter/AnomalyDetection
>
> One of our engineers developed a python version:
>
> https://pypi.org/project/streaming-anomaly-detection/
>
> Is there a way to install this in our cluster?
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Julia API/Interface for Flink

2021-02-23 Thread Khachatryan Roman
Hi,

AFAIK there is no direct support for Julia in Flink currently.
However, you may try to call Python from Julia using either Statefun Python
SDK [1] or PyFlink [2]; or implement a remote Statefun module [3].

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/python.html
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/
[3]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/

Regards,
Roman


On Mon, Feb 22, 2021 at 8:49 PM Beni Bilme  wrote:

> Hello,
>
> Is there a julia api or interface for using flink?
>
> Thanks in advance for any response.
>
> Beni
>
>
>


Re: [Statefun] Dynamic behavior

2021-02-23 Thread Seth Wiesman
I don't think there is anything statefun specific here and I would follow
Igals advice.

Let's say you have a state value called `Behavior` that describes the
behavior of an instance. There is a default behavior but any given instance
may have a customized behavior. What I would do is the following.

Create a state in the TransactionManager called `behavior` that stores the
instance's customized behavior if it exists. When a transaction comes in,
read the behavior state. If it exists (is not None in the case of Python)
then use that. If not, then fall back to the default instance.

The default instance can be provided one of several ways depending on the
specifics of your use case:

1) hard-coded in the function.
2) dynamically loaded via a background thread as a global. so long as that
default is immutable this is safe
3) dynamically loaded via the function instance on first use. stateful
functions have strong support for making async requests so you could simply
query the behavior for that instance on first use from a 3rd party service.

Seth


On Tue, Feb 23, 2021 at 10:55 AM Miguel Araújo  wrote:

> Hi Seth,
>
> Thanks for your comment. I've seen that repository in the past and it was
> really helpful to "validate" that this was the way to go.
> I think my question is not being addressed there though: how could one add
> dynamic behavior to your TransactionManager? In this case, state that is
> available to all TransactionManager instances when they receive a message
> of type Transaction for the first time.
>
> Seth Wiesman  escreveu no dia terça, 23/02/2021 à(s)
> 16:02:
>
>> Hey Miguel,
>>
>> What you are describing is exactly what is implemented in this repo. The
>> TransactionManager function acts as an orchestrator to work with the other
>> functions. The repo is structured as an exercise but the full solution
>> exists on the branch `advanced-solution`.
>>
>> https://github.com/ververica/flink-statefun-workshop
>>
>> On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo 
>> wrote:
>>
>>> Another possibility I am considering is handling this in Flink using a
>>> broadcast and adding all the information needed to the event itself. I'm a
>>> little concerned about the amount of data that will be serialized and sent
>>> on every request though, as I'll need to include information about all
>>> available remote functions, for instance.
>>>
>>> Miguel Araújo  escreveu no dia terça, 23/02/2021
>>> à(s) 09:14:
>>>
 Hi Gordon, Igal,

 Thanks for your replies.
 PubSub would be a good addition, I have a few scenarios where that
 would be useful.

 However, after reading your answers I realized that your proposed
 solutions (which address the most obvious interpretation of my question) do
 not necessarily solve my problem. I should have just stated what it was,
 instead of trying to propose a solution by discussing broadcast...

 I'm trying to implement an "orchestrator" function which, given an
 event, will trigger multiple remote function calls, aggregate their results
 and eventually call yet more functions (based on a provided dependency
 graph). Hence, this orchestrator function has state per event_id and each
 function instance is short-lived (a couple seconds at most, ideally
 sub-second). The question then is not about how to modify a long-running
 function instance (which PubSub would enable), but rather how to have the
 dependency graph available to new functions.

 Given this, Igal's answer seems promising because we have the
 FunctionProvider instantiating a local variable and passing it down on
 every instantiation. I'm assuming there is one FunctionProvider per
 TaskManager. Is there an easy way to have the FunctionProvider receiving
 data coming from a Flink DataStream, or receiving StateFun messages?
 Otherwise, I could have it subscribe to a Kafka topic directly.

 I really appreciate your help.

 Miguel

 Igal Shilman  escreveu no dia segunda, 22/02/2021
 à(s) 12:09:

> Hi Miguel,
>
> I think that there are a couple of ways to achieve this, and it really
> depends on your specific use case, and the trade-offs
> that you are willing to accept.
>
> For example, one way to approach this:
> - Suppose you have an external service somewhere that returns a
> representation of the logic to be interpreted by
> your function at runtime (I think that is the scenario you are
> describing)
> - Then, you can write a background task (a thread) that periodically
> queries that service, and keeps in memory the latest version.
> - You can initialize this background task in your FunctionProvider
> implementation, or even in your StatefulModule if you wish.
> - Then, make sure that your dynamic stateful function has an access to
> the latest value fetched by your client (for example via a shared 
> reference

Re: BroadcastState dropped when data deleted in Kafka

2021-02-23 Thread Khachatryan Roman
Hi,

Deletion of messages in Kafka shouldn't affect Flink state in general.
Probably, some operator in your pipeline is re-reading the topic
and overwrites the state, dropping what was deleted by Kafka.
Could you share the code?

Regards,
Roman


On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:

> Hi,
>
> I have 2 streams one event data and the other rules. I broadcast the rules
> stream and then key the data stream on event type. The connected stream is
> processed thereafter.
> We faced an issue where the rules data in the topic got deleted because of
> Kafka retention policy.
> Post this the existing rules data also got dropped in the broadcast state
> and the processing stopped.
>
> As per my understanding the rules which were present in broadcast state
> should still exist even if the data was deleted in Kafka as the rules dats
> was already processed and stored in state map.
>
> PS: I’m reusing the rules stream as broadcast later in processing as well.
> Could this be an issue?
>
> Thanks,
> Hemant
>


Re: [Statefun] Dynamic behavior

2021-02-23 Thread Miguel Araújo
Hi Seth,

Thanks for your comment. I've seen that repository in the past and it was
really helpful to "validate" that this was the way to go.
I think my question is not being addressed there though: how could one add
dynamic behavior to your TransactionManager? In this case, state that is
available to all TransactionManager instances when they receive a message
of type Transaction for the first time.

Seth Wiesman  escreveu no dia terça, 23/02/2021 à(s)
16:02:

> Hey Miguel,
>
> What you are describing is exactly what is implemented in this repo. The
> TransactionManager function acts as an orchestrator to work with the other
> functions. The repo is structured as an exercise but the full solution
> exists on the branch `advanced-solution`.
>
> https://github.com/ververica/flink-statefun-workshop
>
> On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo  wrote:
>
>> Another possibility I am considering is handling this in Flink using a
>> broadcast and adding all the information needed to the event itself. I'm a
>> little concerned about the amount of data that will be serialized and sent
>> on every request though, as I'll need to include information about all
>> available remote functions, for instance.
>>
>> Miguel Araújo  escreveu no dia terça, 23/02/2021
>> à(s) 09:14:
>>
>>> Hi Gordon, Igal,
>>>
>>> Thanks for your replies.
>>> PubSub would be a good addition, I have a few scenarios where that would
>>> be useful.
>>>
>>> However, after reading your answers I realized that your proposed
>>> solutions (which address the most obvious interpretation of my question) do
>>> not necessarily solve my problem. I should have just stated what it was,
>>> instead of trying to propose a solution by discussing broadcast...
>>>
>>> I'm trying to implement an "orchestrator" function which, given an
>>> event, will trigger multiple remote function calls, aggregate their results
>>> and eventually call yet more functions (based on a provided dependency
>>> graph). Hence, this orchestrator function has state per event_id and each
>>> function instance is short-lived (a couple seconds at most, ideally
>>> sub-second). The question then is not about how to modify a long-running
>>> function instance (which PubSub would enable), but rather how to have the
>>> dependency graph available to new functions.
>>>
>>> Given this, Igal's answer seems promising because we have the
>>> FunctionProvider instantiating a local variable and passing it down on
>>> every instantiation. I'm assuming there is one FunctionProvider per
>>> TaskManager. Is there an easy way to have the FunctionProvider receiving
>>> data coming from a Flink DataStream, or receiving StateFun messages?
>>> Otherwise, I could have it subscribe to a Kafka topic directly.
>>>
>>> I really appreciate your help.
>>>
>>> Miguel
>>>
>>> Igal Shilman  escreveu no dia segunda, 22/02/2021
>>> à(s) 12:09:
>>>
 Hi Miguel,

 I think that there are a couple of ways to achieve this, and it really
 depends on your specific use case, and the trade-offs
 that you are willing to accept.

 For example, one way to approach this:
 - Suppose you have an external service somewhere that returns a
 representation of the logic to be interpreted by
 your function at runtime (I think that is the scenario you are
 describing)
 - Then, you can write a background task (a thread) that periodically
 queries that service, and keeps in memory the latest version.
 - You can initialize this background task in your FunctionProvider
 implementation, or even in your StatefulModule if you wish.
 - Then, make sure that your dynamic stateful function has an access to
 the latest value fetched by your client (for example via a shared reference
 like a j.u.c.AtomicReference)
 - Then on receive, you can simply get that reference and re-apply your
 rules.

 Take a look at [1] for example (it is not exactly the same, but I
 believe that it is close enough)

 [1]
 https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java

 Good luck,
 Igal.


 On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai <
 tzuli...@apache.org> wrote:

> Hi,
>
> FWIW, there is this JIRA that is tracking a pubsub / broadcast
> messaging primitive in StateFun:
> https://issues.apache.org/jira/browse/FLINK-16319
>
> This is probably what you are looking for. And I do agree, in the case
> that the control stream (which updates the application logic) is high
> volume, redeploying functions may not work well.
>
> I don't think there really is a "recommended" way of doing the
> "broadcast control stream, join with main stream" pattern with StateFun at
> the moment, at least without FLINK-16319.
> On the other hand, it could be possible to use stateful 

Re: Object and Integer size in RocksDB ValueState

2021-02-23 Thread Roman Khachatryan
Hi Maciej,

If I understand correctly, you're asking whether ValueState parameterized
with Object has the same size as the one with Integer (given that the
actual stored objects (integers) are the same).
With RocksDB, any state object is serialized first and only then it is
stored in MemTable or in an SST file. So it doesn't matter as long as the
same serializer is used.

You probably should try enabling compression if you didn't already:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression

Regards,
Roman


On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski <
obuchowski.mac...@gmail.com> wrote:

> Hey.
>
> We have deduplication job that has a large amount of keyed ValueState. We
> want to decrease state size as much as possible, so we're using
> ValueState as it's smallest possible Java non-primitive. However,
> as per https://www.baeldung.com/java-size-of-object (and my measurements)
> Java Integer has the same memory size as Object due to padding.
> Will this still be true with RocksDB state? Can we put Integer in state
> without increasing state size?
>
> Thanks, Maciej
>


Re: [Statefun] Dynamic behavior

2021-02-23 Thread Seth Wiesman
Hey Miguel,

What you are describing is exactly what is implemented in this repo. The
TransactionManager function acts as an orchestrator to work with the other
functions. The repo is structured as an exercise but the full solution
exists on the branch `advanced-solution`.

https://github.com/ververica/flink-statefun-workshop

On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo  wrote:

> Another possibility I am considering is handling this in Flink using a
> broadcast and adding all the information needed to the event itself. I'm a
> little concerned about the amount of data that will be serialized and sent
> on every request though, as I'll need to include information about all
> available remote functions, for instance.
>
> Miguel Araújo  escreveu no dia terça, 23/02/2021
> à(s) 09:14:
>
>> Hi Gordon, Igal,
>>
>> Thanks for your replies.
>> PubSub would be a good addition, I have a few scenarios where that would
>> be useful.
>>
>> However, after reading your answers I realized that your proposed
>> solutions (which address the most obvious interpretation of my question) do
>> not necessarily solve my problem. I should have just stated what it was,
>> instead of trying to propose a solution by discussing broadcast...
>>
>> I'm trying to implement an "orchestrator" function which, given an event,
>> will trigger multiple remote function calls, aggregate their results and
>> eventually call yet more functions (based on a provided dependency graph).
>> Hence, this orchestrator function has state per event_id and each function
>> instance is short-lived (a couple seconds at most, ideally sub-second). The
>> question then is not about how to modify a long-running function instance
>> (which PubSub would enable), but rather how to have the dependency graph
>> available to new functions.
>>
>> Given this, Igal's answer seems promising because we have the
>> FunctionProvider instantiating a local variable and passing it down on
>> every instantiation. I'm assuming there is one FunctionProvider per
>> TaskManager. Is there an easy way to have the FunctionProvider receiving
>> data coming from a Flink DataStream, or receiving StateFun messages?
>> Otherwise, I could have it subscribe to a Kafka topic directly.
>>
>> I really appreciate your help.
>>
>> Miguel
>>
>> Igal Shilman  escreveu no dia segunda, 22/02/2021
>> à(s) 12:09:
>>
>>> Hi Miguel,
>>>
>>> I think that there are a couple of ways to achieve this, and it really
>>> depends on your specific use case, and the trade-offs
>>> that you are willing to accept.
>>>
>>> For example, one way to approach this:
>>> - Suppose you have an external service somewhere that returns a
>>> representation of the logic to be interpreted by
>>> your function at runtime (I think that is the scenario you are
>>> describing)
>>> - Then, you can write a background task (a thread) that periodically
>>> queries that service, and keeps in memory the latest version.
>>> - You can initialize this background task in your FunctionProvider
>>> implementation, or even in your StatefulModule if you wish.
>>> - Then, make sure that your dynamic stateful function has an access to
>>> the latest value fetched by your client (for example via a shared reference
>>> like a j.u.c.AtomicReference)
>>> - Then on receive, you can simply get that reference and re-apply your
>>> rules.
>>>
>>> Take a look at [1] for example (it is not exactly the same, but I
>>> believe that it is close enough)
>>>
>>> [1]
>>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>>>
>>> Good luck,
>>> Igal.
>>>
>>>
>>> On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
 Hi,

 FWIW, there is this JIRA that is tracking a pubsub / broadcast
 messaging primitive in StateFun:
 https://issues.apache.org/jira/browse/FLINK-16319

 This is probably what you are looking for. And I do agree, in the case
 that the control stream (which updates the application logic) is high
 volume, redeploying functions may not work well.

 I don't think there really is a "recommended" way of doing the
 "broadcast control stream, join with main stream" pattern with StateFun at
 the moment, at least without FLINK-16319.
 On the other hand, it could be possible to use stateful functions to
 implement a pub-sub model in user space for the time being. I've actually
 left some ideas for implementing that in the comments of FLINK-16319.

 Cheers,
 Gordon


 On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo 
 wrote:

> Hi everyone,
>
> What is the recommended way of achieving the equivalent of a broadcast
> in Flink when using Stateful Functions?
>
> For instance, assume we are implementing something similar to Flink's
> demo fraud detection
> 

Re: WatermarkStrategy.for_bounded_out_of_orderness in table API

2021-02-23 Thread joris.vanagtmaal
I worked out the rowtype input for the conversion to datastream;

type_info = Types.ROW_NAMED(['sender', 'stw', 'time'],[Types.STRING(),
Types.DOUBLE(), Types.LONG()])
datastream=table_env.to_append_stream(my_table, type_info)

But if i try to assign rowtime and watermarks to the datastream and convert
it back to a table, the rowtime/watermarks either get dropped or maybe they
are not created properly.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink 1.12 On Yarn 作业提交失败问题

2021-02-23 Thread m183
你是指提交时所依赖的flink-dist jar包需要是 1.12 版本吗,现在改成1.12 版本还是不行

> 2021年2月23日 下午9:27,LakeShen  写道:
> 
> 这个应该你的 flink 本地配置的目录要是 1.12 版本的,也就是 flink-dist 目录
> 
> 
> 
> 凌战  于2021年2月23日周二 下午7:33写道:
> 
>> 同提交作业到On Yarn集群,客户端的错误也是
>> 
>> 
>> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
>> YARN application unexpectedly switched to state FAILED during deployment.
>> Diagnostics from YARN: Application application_1610671284452_0243 failed
>> 10 times due to AM Container for appattempt_1610671284452_0243_10
>> exited with  exitCode: 1
>> Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from
>> container-launch.
>> Container id: container_e48_1610671284452_0243_10_01
>> Exit code: 1
>> 
>> 
>> [2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1.
>> Error file: prelaunch.err.
>> Last 4096 bytes of prelaunch.err :
>> 
>> 
>> [2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1.
>> Error file: prelaunch.err.
>> Last 4096 bytes of prelaunch.err :
>> 
>> 
>> Yarn那边的日志显示:Could not find or load main class
>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
>> 
>> 
>> 不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题
>> 
>> 
>> | |
>> 凌战
>> |
>> |
>> m18340872...@163.com
>> |
>> 签名由网易邮箱大师定制
>> 在2021年2月23日 18:46,LakeShen 写道:
>> Hi 社区,
>> 
>> 最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:
>> 
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main method
>> caused an error: Failed to execute sql
>> 
>> at
>> 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)
>> 
>> at
>> 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)
>> 
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> 
>> at
>> 
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>> 
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>> 
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>> 
>> at
>> 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>> 
>> at java.security.AccessController.doPrivileged(Native Method)
>> 
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> 
>> at
>> 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> 
>> at
>> 
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> 
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>> 
>> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>> 
>> at
>> 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
>> 
>> at
>> 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)
>> 
>> at
>> 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
>> 
>> at
>> 
>> com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)
>> 
>> at
>> 
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)
>> 
>> at
>> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>> 
>> at
>> com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)
>> 
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 
>> at
>> 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 
>> at
>> 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> 
>> at
>> 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)
>> 
>> ... 11 more
>> 
>> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
>> Could not deploy Yarn job cluster.
>> 
>> at
>> 
>> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)
>> 
>> at
>> 
>> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
>> 
>> at
>> 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)
>> 
>> at
>> 
>> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
>> 
>> at
>> 
>> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)
>> 
>> at
>> 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)
>> 
>> ... 22 more
>> 
>> Caused by:
>> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
>> YARN application unexpectedly switched to state FAILED during deployment.
>> Diagnostics from YARN: Application application_1613992328588_4441 

回复: Flink 1.12 On Yarn 作业提交失败问题

2021-02-23 Thread 凌战
你是指提交时所依赖的flink-dist jar包需要是 1.12 版本吗,现在改成1.12 版本还是不行


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 21:27,LakeShen 写道:
这个应该你的 flink 本地配置的目录要是 1.12 版本的,也就是 flink-dist 目录



凌战  于2021年2月23日周二 下午7:33写道:

同提交作业到On Yarn集群,客户端的错误也是


org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1610671284452_0243 failed
10 times due to AM Container for appattempt_1610671284452_0243_10
exited with  exitCode: 1
Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from
container-launch.
Container id: container_e48_1610671284452_0243_10_01
Exit code: 1


[2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


[2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


Yarn那边的日志显示:Could not find or load main class
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint


不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 18:46,LakeShen 写道:
Hi 社区,

最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)

at

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at

org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at

org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at

org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.TableException: Failed to execute sql

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at

com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)

at

java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)

at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)

... 11 more

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.

at

org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)

at

org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)

at

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)

at

org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)

at

org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)

... 22 more

Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1613992328588_4441 failed 2
times due to AM Container for appattempt_1613992328588_4441_02 exited
with  exitCode: 1
Diagnostics: Exception from container-launch.
Container id: container_xxx
Exit code: 1
Stack trace: ExitCodeException exitCode=1:

at org.apache.hadoop.util.Shell.runCommand(Shell.java:575)

at org.apache.hadoop.util.Shell.run(Shell.java:478)

at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:766)

at


Re: [Statefun] Dynamic behavior

2021-02-23 Thread Miguel Araújo
Another possibility I am considering is handling this in Flink using a
broadcast and adding all the information needed to the event itself. I'm a
little concerned about the amount of data that will be serialized and sent
on every request though, as I'll need to include information about all
available remote functions, for instance.

Miguel Araújo  escreveu no dia terça, 23/02/2021 à(s)
09:14:

> Hi Gordon, Igal,
>
> Thanks for your replies.
> PubSub would be a good addition, I have a few scenarios where that would
> be useful.
>
> However, after reading your answers I realized that your proposed
> solutions (which address the most obvious interpretation of my question) do
> not necessarily solve my problem. I should have just stated what it was,
> instead of trying to propose a solution by discussing broadcast...
>
> I'm trying to implement an "orchestrator" function which, given an event,
> will trigger multiple remote function calls, aggregate their results and
> eventually call yet more functions (based on a provided dependency graph).
> Hence, this orchestrator function has state per event_id and each function
> instance is short-lived (a couple seconds at most, ideally sub-second). The
> question then is not about how to modify a long-running function instance
> (which PubSub would enable), but rather how to have the dependency graph
> available to new functions.
>
> Given this, Igal's answer seems promising because we have the
> FunctionProvider instantiating a local variable and passing it down on
> every instantiation. I'm assuming there is one FunctionProvider per
> TaskManager. Is there an easy way to have the FunctionProvider receiving
> data coming from a Flink DataStream, or receiving StateFun messages?
> Otherwise, I could have it subscribe to a Kafka topic directly.
>
> I really appreciate your help.
>
> Miguel
>
> Igal Shilman  escreveu no dia segunda, 22/02/2021
> à(s) 12:09:
>
>> Hi Miguel,
>>
>> I think that there are a couple of ways to achieve this, and it really
>> depends on your specific use case, and the trade-offs
>> that you are willing to accept.
>>
>> For example, one way to approach this:
>> - Suppose you have an external service somewhere that returns a
>> representation of the logic to be interpreted by
>> your function at runtime (I think that is the scenario you are describing)
>> - Then, you can write a background task (a thread) that periodically
>> queries that service, and keeps in memory the latest version.
>> - You can initialize this background task in your FunctionProvider
>> implementation, or even in your StatefulModule if you wish.
>> - Then, make sure that your dynamic stateful function has an access to
>> the latest value fetched by your client (for example via a shared reference
>> like a j.u.c.AtomicReference)
>> - Then on receive, you can simply get that reference and re-apply your
>> rules.
>>
>> Take a look at [1] for example (it is not exactly the same, but I believe
>> that it is close enough)
>>
>> [1]
>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>>
>> Good luck,
>> Igal.
>>
>>
>> On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi,
>>>
>>> FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging
>>> primitive in StateFun:
>>> https://issues.apache.org/jira/browse/FLINK-16319
>>>
>>> This is probably what you are looking for. And I do agree, in the case
>>> that the control stream (which updates the application logic) is high
>>> volume, redeploying functions may not work well.
>>>
>>> I don't think there really is a "recommended" way of doing the
>>> "broadcast control stream, join with main stream" pattern with StateFun at
>>> the moment, at least without FLINK-16319.
>>> On the other hand, it could be possible to use stateful functions to
>>> implement a pub-sub model in user space for the time being. I've actually
>>> left some ideas for implementing that in the comments of FLINK-16319.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo 
>>> wrote:
>>>
 Hi everyone,

 What is the recommended way of achieving the equivalent of a broadcast
 in Flink when using Stateful Functions?

 For instance, assume we are implementing something similar to Flink's
 demo fraud detection
  but
 in Stateful Functions - how can one dynamically update the application's
 logic then?
 There was a similar question in this mailing list in the past where it
 was recommended moving the dynamic logic to a remote function
 
  so
 that one could achieve that by deploying a new container. I think that's
 not very realistic as updates 

Re: 通过普通ddl来读写hive

2021-02-23 Thread Rui Li
Hello,

因为hive本身是通过metastore来管理元数据的,所以通过HiveCatalog对接metastore里的元数据是比较自然的用法。Flink引入Catalog接口的初衷也是为了能方便的对接外部系统的元数据。如果你们用的是自己开发的元数据管理平台,也可以考虑实现自定义的Catalog来对接。

我觉得用in-memory
catalog维护hive元数据有点像手动对metastore的元数据做一次snapshot。虽然避免了用户直接访问底层元数据,但使用起来并不方便,比如想要读一张分区表的话需要手动把每个分区的信息添加到in-memory
catalog里。

所以如果是出于元数据安全的考虑,更好的做法应该是把catalog接入到已有鉴权机制的控制中。

On Tue, Feb 23, 2021 at 7:17 PM silence  wrote:

> 我理解各个公司都会有自己的元数据管理平台,hive表的创建修改都需要经过严格的权限控制在平台上进行操作,包括调度任务、实时写入任务、数据血缘等。
> 我个人觉得理想的方式是单个flink
> sql的所有的connector通过自维护的元数据进行生成,不需要引入hivecatalog,使用默认的MemoryCatalog即可。
> 总结一下就是不希望引入HiveCatalog来进行hive表的读写
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best regards!
Rui Li


Re: WatermarkStrategy.for_bounded_out_of_orderness in table API

2021-02-23 Thread joris.vanagtmaal
Or is this only possible with the data stream api? I tried converting a table
to a datastream of rows, but being a noob, finding examples of how to do
this has been difficult and not sure how to provide the required
RowTypeInfo. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink 1.12 On Yarn 作业提交失败问题

2021-02-23 Thread LakeShen
这个应该你的 flink 本地配置的目录要是 1.12 版本的,也就是 flink-dist 目录



凌战  于2021年2月23日周二 下午7:33写道:

> 同提交作业到On Yarn集群,客户端的错误也是
>
>
> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
> YARN application unexpectedly switched to state FAILED during deployment.
> Diagnostics from YARN: Application application_1610671284452_0243 failed
> 10 times due to AM Container for appattempt_1610671284452_0243_10
> exited with  exitCode: 1
> Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from
> container-launch.
> Container id: container_e48_1610671284452_0243_10_01
> Exit code: 1
>
>
> [2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1.
> Error file: prelaunch.err.
> Last 4096 bytes of prelaunch.err :
>
>
> [2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1.
> Error file: prelaunch.err.
> Last 4096 bytes of prelaunch.err :
>
>
> Yarn那边的日志显示:Could not find or load main class
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
>
>
> 不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题
>
>
> | |
> 凌战
> |
> |
> m18340872...@163.com
> |
> 签名由网易邮箱大师定制
> 在2021年2月23日 18:46,LakeShen 写道:
> Hi 社区,
>
> 最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Failed to execute sql
>
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)
>
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
>
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)
>
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
>
> at
>
> com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)
>
> at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)
>
> at
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>
> at
> com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)
>
> ... 11 more
>
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not deploy Yarn job cluster.
>
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)
>
> at
>
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
>
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)
>
> at
>
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
>
> at
>
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)
>
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)
>
> ... 22 more
>
> Caused by:
> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
> YARN application unexpectedly switched to state FAILED during deployment.
> Diagnostics from YARN: Application application_1613992328588_4441 failed 2
> times due to AM Container for appattempt_1613992328588_4441_02 exited
> with  exitCode: 1
> Diagnostics: Exception from container-launch.
> Container id: container_xxx
> Exit code: 1
> Stack trace: ExitCodeException exitCode=1:
>
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:575)
>
> at 

Re: 本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 Thread Smile
你好,
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 这个类应该是在 flink-yarn
这个 module 里面,打 lib 包的时候作为依赖被打进 flink-dist 里面。
为什么你同时添加了 flink-dist_2.11-1.10.1.jar 和 flink-yarn_2.11-1.11.1.jar 这两个 jar
呀,不会冲突吗?

Smile



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.12 On Yarn 作业提交失败问题

2021-02-23 Thread Smile
// 上一次似乎没发成功,换个方式重发一次,若有打扰请见谅

HADOOP_CLASSPATH 看下环境变量的内容是什么,是否和 "hadoop classpath" 这个语句执行的结果一致?
根据 [1],Flink 1.11 开始,不再默认把 HDFS 相关的 jar 包打进 Flink 的包里面了,而是需要用户在执行时指定 HDFS
相关包路径,export HADOOP_CLASSPATH=`hadoop classpath` 这句话实际上的效果是执行 hadoop
classpath 命令并将结果赋值给 HADOOP_CLASSPATH 这个系统变量。

另外控制变量的话你找个最简单的作业提交一下看看?
看上面的错误日志你提交的应该是个 SQL 作业,找个 DataStream 的 word count 提交看下报错信息是什么。

参考:
[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复:本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 Thread Smile@LETTers
你好,org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 这个类应该是在 
flink-yarn 这个 module 里面,打 lib 包的时候作为依赖被打进 flink-dist 里面。为什么你同时添加了 
flink-dist_2.11-1.10.1.jar 和 flink-yarn_2.11-1.11.1.jar 这两个 jar 
呀,不会冲突吗?Smile
在 2021-02-23 19:27:43,"凌战"  写道:
>上面添加的jar包没有显示,补充一下:目前除了用户jar包,添加的依赖jar包就是
>flink-dist_2.11-1.10.1.jar
>flink-queryable-state-runtime_2.11-1.10.1.jar
>flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
>flink-table-blink_2.11-1.10.1.jar
>flink-table_2.11-1.10.1.jar
>flink-yarn_2.11-1.11.1.jar
>
>
>| |
>凌战
>|
>|
>m18340872...@163.com
>|
>签名由网易邮箱大师定制
>在2021年2月23日 19:02,凌战 写道:
>
>
>  List userClassPaths = new ArrayList<>();
>File file = ResourceUtils.getFile(new 
> URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
>if(file.isDirectory()&()!=null){
>for(File ele: Objects.requireNonNull(file.listFiles())) {
>userClassPaths.add(ele.toURI().toURL());
>}
>}
>
>
>// 构建PackagedProgram
>PackagedProgram packagedProgram =
>PackagedProgram.newBuilder()
>.setJarFile(jar)
>.setUserClassPaths(userClassPaths)
>.build();
>
>
>// 获取Configuration
>String configurationDirectory = 
> CliFrontend.getConfigurationDirectoryFromEnv();
>
>
>// 2. load the global configuration
>// 加载 flink-conf.yaml构成 Configuration
>Configuration configuration = 
> GlobalConfiguration.loadConfiguration(configurationDirectory);
>
>
>
>
>// 3. 加载jar包
>ConfigUtils.encodeCollectionToConfig(
>configuration,
>PipelineOptions.JARS,
>packagedProgram.getJobJarAndDependencies(),
>URL::toString
>);
>
>
>ConfigUtils.encodeCollectionToConfig(
>configuration,
>PipelineOptions.CLASSPATHS,
>packagedProgram.getClasspaths(),
>URL::toString
>);
>
>
>
>
>Pipeline pipeline = 
> this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
>try {
>return PackagedProgramUtils.
>getPipelineFromProgram(packagedProgram,
>configuration,
>10,
>false);
>} catch (ProgramInvocationException e) {
>e.printStackTrace();
>return null;
>}
>});
>
>
>
>
>// yarn-per-job模式
>return new PlatformAbstractJobClusterExecutor<>(new 
> YarnClusterClientFactory()).
>
> execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();
>
>
>
>
>
>
>这里添加的依赖jar包如下
>
>
>
>
>但是出现报错:
>
>
>2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
>o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 
>0 default Kryo serializers
>2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
>o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
>Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
>locate the jar
>2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
>.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
>not specified, use the configured deprecated task manager heap value 
>(1024.000mb (1073741824 bytes)) for it.
>2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
>o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
>overhead memory (102.400mb (107374184 bytes)) is less than its min value 
>192.000mb (201326592 bytes), min value will be used instead
>2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
>.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
>2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
>o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
>ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
>slotsPerTaskManager=1}
>2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
>o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] 
>does not exist.
>2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
>o.a.flink.yarn.YarnClusterDescriptor : Environment variable 
>'FLINK_LIB_DIR' not set and ship files have not been provided manually. Not 
>shipping any library files.
>2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exec-4] 
>o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] 
>does not exist.
>2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
>.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
>not specified, use the configured deprecated task manager heap value 
>(1024.000mb (1073741824 bytes)) for it.
>2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
>o.a.f.r.u.c.memory.ProcessMemoryUtils: The 

Object and Integer size in RocksDB ValueState

2021-02-23 Thread Maciej Obuchowski
Hey.

We have deduplication job that has a large amount of keyed ValueState. We
want to decrease state size as much as possible, so we're using
ValueState as it's smallest possible Java non-primitive. However,
as per https://www.baeldung.com/java-size-of-object (and my measurements)
Java Integer has the same memory size as Object due to padding.
Will this still be true with RocksDB state? Can we put Integer in state
without increasing state size?

Thanks, Maciej


回复:Flink 1.12 On Yarn 作业提交失败问题

2021-02-23 Thread 凌战
同提交作业到On Yarn集群,客户端的错误也是


org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN 
application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1610671284452_0243 failed 10 
times due to AM Container for appattempt_1610671284452_0243_10 exited with  
exitCode: 1
Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from 
container-launch.
Container id: container_e48_1610671284452_0243_10_01
Exit code: 1


[2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1. Error 
file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


[2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1. Error 
file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


Yarn那边的日志显示:Could not find or load main class 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint


不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 18:46,LakeShen 写道:
Hi 社区,

最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.TableException: Failed to execute sql

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)

at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)

at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)

... 11 more

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.

at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)

at
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)

at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)

at
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)

... 22 more

Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1613992328588_4441 failed 2
times due to AM Container for appattempt_1613992328588_4441_02 exited
with  exitCode: 1
Diagnostics: Exception from container-launch.
Container id: container_xxx
Exit code: 1
Stack trace: ExitCodeException exitCode=1:

at org.apache.hadoop.util.Shell.runCommand(Shell.java:575)

at org.apache.hadoop.util.Shell.run(Shell.java:478)

at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:766)

at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)

at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)

at

Re:Flink 1.12 On Yarn 作业提交失败问题

2021-02-23 Thread Smile@LETTers
HADOOP_CLASSPATH 看下环境变量的内容是什么,是否和 "hadoop classpath" 这个语句执行的结果一致?
根据 [1],Flink 1.11 开始,不再默认把 HDFS 相关的 jar 包打进 Flink 的包里面了,而是需要用户在执行时指定 HDFS 
相关包路径,export HADOOP_CLASSPATH=`hadoop classpath` 这句话实际上的效果是执行 hadoop classpath 
命令并将结果赋值给 HADOOP_CLASSPATH 这个系统变量。

另外控制变量的话你找个最简单的作业提交一下看看?
看上面的错误日志你提交的应该是个 SQL 作业,找个 DataStream 的 word count 提交看下报错信息是什么。

参考:
[1]. 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html

回复:本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 Thread 凌战
上面添加的jar包没有显示,补充一下:目前除了用户jar包,添加的依赖jar包就是
flink-dist_2.11-1.10.1.jar
flink-queryable-state-runtime_2.11-1.10.1.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
flink-table-blink_2.11-1.10.1.jar
flink-table_2.11-1.10.1.jar
flink-yarn_2.11-1.11.1.jar


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 19:02,凌战 写道:


  List userClassPaths = new ArrayList<>();
File file = ResourceUtils.getFile(new 
URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
if(file.isDirectory()&()!=null){
for(File ele: Objects.requireNonNull(file.listFiles())) {
userClassPaths.add(ele.toURI().toURL());
}
}


// 构建PackagedProgram
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()
.setJarFile(jar)
.setUserClassPaths(userClassPaths)
.build();


// 获取Configuration
String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();


// 2. load the global configuration
// 加载 flink-conf.yaml构成 Configuration
Configuration configuration = 
GlobalConfiguration.loadConfiguration(configurationDirectory);




// 3. 加载jar包
ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.JARS,
packagedProgram.getJobJarAndDependencies(),
URL::toString
);


ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.CLASSPATHS,
packagedProgram.getClasspaths(),
URL::toString
);




Pipeline pipeline = 
this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
try {
return PackagedProgramUtils.
getPipelineFromProgram(packagedProgram,
configuration,
10,
false);
} catch (ProgramInvocationException e) {
e.printStackTrace();
return null;
}
});




// yarn-per-job模式
return new PlatformAbstractJobClusterExecutor<>(new 
YarnClusterClientFactory()).

execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();






这里添加的依赖jar包如下




但是出现报错:


2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 0 
default Kryo serializers
2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
locate the jar
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
slotsPerTaskManager=1}
2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Environment variable 'FLINK_LIB_DIR' 
not set and ship files have not been provided manually. Not shipping any 
library files.
2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:48.554  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Submitting application master 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 

回复:本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 Thread 凌战
上面添加的jar包没有显示,补充一下:目前除了用户jar包,添加的依赖jar包就是
flink-dist_2.11-1.10.1.jar
flink-queryable-state-runtime_2.11-1.10.1.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
flink-table-blink_2.11-1.10.1.jar
flink-table_2.11-1.10.1.jar
flink-yarn_2.11-1.11.1.jar


但是提交到flink on yarn那边,仍然报错


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 19:02,凌战 写道:


  List userClassPaths = new ArrayList<>();
File file = ResourceUtils.getFile(new 
URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
if(file.isDirectory()&()!=null){
for(File ele: Objects.requireNonNull(file.listFiles())) {
userClassPaths.add(ele.toURI().toURL());
}
}


// 构建PackagedProgram
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()
.setJarFile(jar)
.setUserClassPaths(userClassPaths)
.build();


// 获取Configuration
String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();


// 2. load the global configuration
// 加载 flink-conf.yaml构成 Configuration
Configuration configuration = 
GlobalConfiguration.loadConfiguration(configurationDirectory);




// 3. 加载jar包
ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.JARS,
packagedProgram.getJobJarAndDependencies(),
URL::toString
);


ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.CLASSPATHS,
packagedProgram.getClasspaths(),
URL::toString
);




Pipeline pipeline = 
this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
try {
return PackagedProgramUtils.
getPipelineFromProgram(packagedProgram,
configuration,
10,
false);
} catch (ProgramInvocationException e) {
e.printStackTrace();
return null;
}
});




// yarn-per-job模式
return new PlatformAbstractJobClusterExecutor<>(new 
YarnClusterClientFactory()).

execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();






这里添加的依赖jar包如下




但是出现报错:


2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 0 
default Kryo serializers
2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
locate the jar
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
slotsPerTaskManager=1}
2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Environment variable 'FLINK_LIB_DIR' 
not set and ship files have not been provided manually. Not shipping any 
library files.
2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:48.554  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Submitting application master 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- 

Re: 通过普通ddl来读写hive

2021-02-23 Thread silence
我理解各个公司都会有自己的元数据管理平台,hive表的创建修改都需要经过严格的权限控制在平台上进行操作,包括调度任务、实时写入任务、数据血缘等。
我个人觉得理想的方式是单个flink
sql的所有的connector通过自维护的元数据进行生成,不需要引入hivecatalog,使用默认的MemoryCatalog即可。
总结一下就是不希望引入HiveCatalog来进行hive表的读写



--
Sent from: http://apache-flink.147419.n8.nabble.com/

本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 Thread 凌战


  List userClassPaths = new ArrayList<>();
File file = ResourceUtils.getFile(new 
URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
if(file.isDirectory()&()!=null){
for(File ele: Objects.requireNonNull(file.listFiles())) {
userClassPaths.add(ele.toURI().toURL());
}
}


// 构建PackagedProgram
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()
.setJarFile(jar)
.setUserClassPaths(userClassPaths)
.build();


// 获取Configuration
String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();


// 2. load the global configuration
// 加载 flink-conf.yaml构成 Configuration
Configuration configuration = 
GlobalConfiguration.loadConfiguration(configurationDirectory);




// 3. 加载jar包
ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.JARS,
packagedProgram.getJobJarAndDependencies(),
URL::toString
);


ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.CLASSPATHS,
packagedProgram.getClasspaths(),
URL::toString
);




Pipeline pipeline = 
this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
try {
return PackagedProgramUtils.
getPipelineFromProgram(packagedProgram,
configuration,
10,
false);
} catch (ProgramInvocationException e) {
e.printStackTrace();
return null;
}
});




// yarn-per-job模式
return new PlatformAbstractJobClusterExecutor<>(new 
YarnClusterClientFactory()).

execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();






这里添加的依赖jar包如下




但是出现报错:


2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 0 
default Kryo serializers
2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
locate the jar
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
slotsPerTaskManager=1}
2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Environment variable 'FLINK_LIB_DIR' 
not set and ship files have not been provided manually. Not shipping any 
library files.
2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:48.554  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Submitting application master 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 
o.a.h.y.client.api.impl.YarnClientImpl   : Submitted application 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Waiting for the cluster to be 
allocated
2021-02-23 18:50:49.281  INFO 26116 --- [nio-8080-exec-4] 

回复:本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 Thread 凌战




| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 18:57,凌战 写道:
List userClassPaths = new ArrayList<>();
File file = ResourceUtils.getFile(new 
URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
if(file.isDirectory()&()!=null){
for(File ele: Objects.requireNonNull(file.listFiles())) {
userClassPaths.add(ele.toURI().toURL());
}
}


// 构建PackagedProgram
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()
.setJarFile(jar)
.setUserClassPaths(userClassPaths)
.build();


// 获取Configuration
String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();


// 2. load the global configuration
// 加载 flink-conf.yaml构成 Configuration
Configuration configuration = 
GlobalConfiguration.loadConfiguration(configurationDirectory);




// 3. 加载jar包
ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.JARS,
packagedProgram.getJobJarAndDependencies(),
URL::toString
);


ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.CLASSPATHS,
packagedProgram.getClasspaths(),
URL::toString
);




Pipeline pipeline = 
this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
try {
return PackagedProgramUtils.
getPipelineFromProgram(packagedProgram,
configuration,
10,
false);
} catch (ProgramInvocationException e) {
e.printStackTrace();
return null;
}
});




// yarn-per-job模式
return new PlatformAbstractJobClusterExecutor<>(new 
YarnClusterClientFactory()).

execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();






这里添加的依赖jar包如下




但是出现报错:


2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 0 
default Kryo serializers
2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
locate the jar
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
slotsPerTaskManager=1}
2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Environment variable 'FLINK_LIB_DIR' 
not set and ship files have not been provided manually. Not shipping any 
library files.
2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:48.554  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Submitting application master 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 
o.a.h.y.client.api.impl.YarnClientImpl   : Submitted application 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Waiting for the cluster to be 
allocated
2021-02-23 

本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 Thread 凌战
List userClassPaths = new ArrayList<>();
File file = ResourceUtils.getFile(new 
URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
if(file.isDirectory()&()!=null){
for(File ele: Objects.requireNonNull(file.listFiles())) {
userClassPaths.add(ele.toURI().toURL());
}
}


// 构建PackagedProgram
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()
.setJarFile(jar)
.setUserClassPaths(userClassPaths)
.build();


// 获取Configuration
String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();


// 2. load the global configuration
// 加载 flink-conf.yaml构成 Configuration
Configuration configuration = 
GlobalConfiguration.loadConfiguration(configurationDirectory);




// 3. 加载jar包
ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.JARS,
packagedProgram.getJobJarAndDependencies(),
URL::toString
);


ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.CLASSPATHS,
packagedProgram.getClasspaths(),
URL::toString
);




Pipeline pipeline = 
this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
try {
return PackagedProgramUtils.
getPipelineFromProgram(packagedProgram,
configuration,
10,
false);
} catch (ProgramInvocationException e) {
e.printStackTrace();
return null;
}
});




// yarn-per-job模式
return new PlatformAbstractJobClusterExecutor<>(new 
YarnClusterClientFactory()).

execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();






这里添加的依赖jar包如下




但是出现报错:


2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 0 
default Kryo serializers
2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
locate the jar
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
slotsPerTaskManager=1}
2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Environment variable 'FLINK_LIB_DIR' 
not set and ship files have not been provided manually. Not shipping any 
library files.
2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:48.554  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Submitting application master 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 
o.a.h.y.client.api.impl.YarnClientImpl   : Submitted application 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Waiting for the cluster to be 
allocated
2021-02-23 18:50:49.281  INFO 26116 --- [nio-8080-exec-4] 

Re: 通过普通ddl来读写hive

2021-02-23 Thread Rui Li
Hi,

尝试回答一下你提的这几个问题。

1.
不希望用户直接在metastore中建表的话,那我理解用户所能创建的hive表就只有临时表了。目前HiveCatalog还不支持临时hive表,不过社区已经有计划做了,顺利的话可以在1.13中实现。我想了解一下抛开flink不谈,你们在hive中是如何解决这个问题的呢?也是只允许用户创建临时表么?还是说通过某种权限控制的机制来限制哪些用户可以建表?

2. 针对hive metastore里已有的表,通过flink读写数据不需要修改table
property。除非是希望修改表自身的属性(比如format、SerDe等等),这方面跟hive中的使用习惯是一致的。

3.
不用hive方言创建hive表可以尝试添加'is_generic'='false'参数,但前提也是要创建在HiveCatalog里。另外这种方式所能表达的语义很有限,基本只能创建简单的文本表。

4. 这个问题跟#1比较类似,也是可以通过临时表达到这个效果。

On Tue, Feb 23, 2021 at 5:58 PM silence  wrote:

> 你好
> 感谢回复
> 主要有以下几点原因:
> 1、直接使用hive catalog进行hive表的创建修改风险太高,更希望在平台层限制hive表的创建和修改
>
> 2、connector的配置是保存在hive表的DBPROPERTIES里的,这是否就意味着想通过flink往现有hive表里写数据需要先通过alter语句修改hive表的属性配置,这里不希望对用户直接暴露alter
> hive的能力
> 3、使用普通的ddl可以与现有connector的定义统一风格,不需要来回切换方言
> 4、可以不用将配置信息持久化,通过GenericInMemoryCatalog使用即可
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best regards!
Rui Li


Flink 1.12 On Yarn 作业提交失败问题

2021-02-23 Thread LakeShen
Hi 社区,

  最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.TableException: Failed to execute sql

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)

at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)

at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)

... 11 more

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.

at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)

at
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)

at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)

at
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)

... 22 more

Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1613992328588_4441 failed 2
times due to AM Container for appattempt_1613992328588_4441_02 exited
with  exitCode: 1
Diagnostics: Exception from container-launch.
Container id: container_xxx
Exit code: 1
Stack trace: ExitCodeException exitCode=1:

at org.apache.hadoop.util.Shell.runCommand(Shell.java:575)

at org.apache.hadoop.util.Shell.run(Shell.java:478)

at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:766)

at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)

at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)

at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)


  相关信息如下:
  1. 我的 Flink 作业中没有 Hadoop 相关的依赖
  2. 提交作业的机器,以及 Hadoop 集群每台机器都有 HADOOP_CLASSPATH 环境变量
  3. Flink 作业提交到 Yarn 后,状态之后从 Accepted 到 FAILED 状态。

  希望有人帮我解惑,感谢

  Best,
  LakeShen


Re: 通过普通ddl来读写hive

2021-02-23 Thread silence
你好
感谢回复
主要有以下几点原因:
1、直接使用hive catalog进行hive表的创建修改风险太高,更希望在平台层限制hive表的创建和修改
2、connector的配置是保存在hive表的DBPROPERTIES里的,这是否就意味着想通过flink往现有hive表里写数据需要先通过alter语句修改hive表的属性配置,这里不希望对用户直接暴露alter
hive的能力
3、使用普通的ddl可以与现有connector的定义统一风格,不需要来回切换方言
4、可以不用将配置信息持久化,通过GenericInMemoryCatalog使用即可



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: [Statefun] Dynamic behavior

2021-02-23 Thread Miguel Araújo
Hi Gordon, Igal,

Thanks for your replies.
PubSub would be a good addition, I have a few scenarios where that would be
useful.

However, after reading your answers I realized that your proposed solutions
(which address the most obvious interpretation of my question) do not
necessarily solve my problem. I should have just stated what it was,
instead of trying to propose a solution by discussing broadcast...

I'm trying to implement an "orchestrator" function which, given an event,
will trigger multiple remote function calls, aggregate their results and
eventually call yet more functions (based on a provided dependency graph).
Hence, this orchestrator function has state per event_id and each function
instance is short-lived (a couple seconds at most, ideally sub-second). The
question then is not about how to modify a long-running function instance
(which PubSub would enable), but rather how to have the dependency graph
available to new functions.

Given this, Igal's answer seems promising because we have the
FunctionProvider instantiating a local variable and passing it down on
every instantiation. I'm assuming there is one FunctionProvider per
TaskManager. Is there an easy way to have the FunctionProvider receiving
data coming from a Flink DataStream, or receiving StateFun messages?
Otherwise, I could have it subscribe to a Kafka topic directly.

I really appreciate your help.

Miguel

Igal Shilman  escreveu no dia segunda, 22/02/2021 à(s)
12:09:

> Hi Miguel,
>
> I think that there are a couple of ways to achieve this, and it really
> depends on your specific use case, and the trade-offs
> that you are willing to accept.
>
> For example, one way to approach this:
> - Suppose you have an external service somewhere that returns a
> representation of the logic to be interpreted by
> your function at runtime (I think that is the scenario you are describing)
> - Then, you can write a background task (a thread) that periodically
> queries that service, and keeps in memory the latest version.
> - You can initialize this background task in your FunctionProvider
> implementation, or even in your StatefulModule if you wish.
> - Then, make sure that your dynamic stateful function has an access to the
> latest value fetched by your client (for example via a shared reference
> like a j.u.c.AtomicReference)
> - Then on receive, you can simply get that reference and re-apply your
> rules.
>
> Take a look at [1] for example (it is not exactly the same, but I believe
> that it is close enough)
>
> [1]
> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>
> Good luck,
> Igal.
>
>
> On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging
>> primitive in StateFun:
>> https://issues.apache.org/jira/browse/FLINK-16319
>>
>> This is probably what you are looking for. And I do agree, in the case
>> that the control stream (which updates the application logic) is high
>> volume, redeploying functions may not work well.
>>
>> I don't think there really is a "recommended" way of doing the "broadcast
>> control stream, join with main stream" pattern with StateFun at the moment,
>> at least without FLINK-16319.
>> On the other hand, it could be possible to use stateful functions to
>> implement a pub-sub model in user space for the time being. I've actually
>> left some ideas for implementing that in the comments of FLINK-16319.
>>
>> Cheers,
>> Gordon
>>
>>
>> On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo 
>> wrote:
>>
>>> Hi everyone,
>>>
>>> What is the recommended way of achieving the equivalent of a broadcast
>>> in Flink when using Stateful Functions?
>>>
>>> For instance, assume we are implementing something similar to Flink's
>>> demo fraud detection
>>>  but
>>> in Stateful Functions - how can one dynamically update the application's
>>> logic then?
>>> There was a similar question in this mailing list in the past where it
>>> was recommended moving the dynamic logic to a remote function
>>> 
>>>  so
>>> that one could achieve that by deploying a new container. I think that's
>>> not very realistic as updates might happen with a frequency that's not
>>> compatible with that approach (e.g., sticking to the fraud detection
>>> example, updating fraud detection rules every hour is not unusual), nor
>>> should one be deploying a new container when data (not code) changes.
>>>
>>> Is there a way of, for example, modifying FunctionProviders
>>> 
>>> on the fly?
>>>
>>> Thanks,
>>> Miguel
>>>
>>


Re: 通过普通ddl来读写hive

2021-02-23 Thread Rui Li
你好,

请问一下不想用HiveCatalog来读写hive表的原因是什么呢?是不希望将hive表持久化(类似临时表的效果),或者是不希望维护一个metastore
server?

On Tue, Feb 23, 2021 at 2:57 PM silence  wrote:

> 问一下社区有没有计划支持普通的ddl(不用hive的catalog)来进行读写hive表吗
> 现在不支持是有什么考虑吗
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best regards!
Rui Li


Fwd: Flink custom trigger use case

2021-02-23 Thread Diwakar Jha
Hello,

posting again for help. I'm planning to use state TTL but would like to
know if there is any other way to do it. I'm using Flink 1.11.
Thanks!

-- Forwarded message -
From: Diwakar Jha 
Date: Mon, Feb 22, 2021 at 6:28 PM
Subject: Flink custom trigger use case
To: user 



Hello,

I'm trying to use a custom trigger for one of my use case. I have a basic
logic (as shown below) of using keyBy on the input stream and using a
window of 1 min.

.keyBy()
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(new CustomTrigger())
.aggregate(Input.getAggregationFunction(), new
AggregationProcessingWindow());


My custom trigger is expected to fire the first event of the keyBy
instantly and any subsequent events should be aggregated in the window.

.trigger(new Trigger() {
> @Override
> public TriggerResult onElement(Record record, long l, TimeWindow
> timeWindow, TriggerContext triggerContext) throws Exception {
> ValueState firstSeen =
> triggerContext.getPartitionedState(firstSceenDescriptor);
> if(firstSeen.value() == null) {
> firstSeen.update(true);
> // fire trigger to early evaluate window and purge that event.
> return TriggerResult.FIRE_AND_PURGE;
> }
> // Continue. Do not evaluate window per element
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> // final evaluation and purge window state
> return TriggerResult.FIRE_AND_PURGE;
> }
> @Override
> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> return TriggerResult.CONTINUE;
> }
> @Override
> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
> throws Exception {
>
> }
> })




Currently, I see (for each window and same key) the first event of the
window is always fired. But I want to see this happening for only the first
window and for the subsequent window it should aggregate all the events and
then fire.

Example : all the records have the same key.
current output.
record 1 : first event in the window-1 : fired record 2 : last event in the
window-1 : fired record 3 : first event in the window-2 : fired record 4,
record 5 : - 2 events in the window-2 : fired.

expected output.
record 1 : first event in the window-1 : fired record 2 : last event in the
window-1 : fired record 3,4,5 : all event in the window-2 : fired
window-2 should not fire the first event of the same key.

I'm reading it here
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
but not able to solve it. Any pointers would be helpful.

Thanks.


Re: flink1.11的Streaming File Sink问题

2021-02-23 Thread Robin Zhang
Hi, op
   
flink内部可以实现exactly-once语义,但是写到hdfs是至少一次的语义,如果任务失败重新启动会发生数据重复的问题,所以需要自己增加逻辑处理。

Best,
Robin


op wrote
> 大家好:
>   我想知道flink1.11的Streaming File
> Sink保存流数据到hdfs支持exactly-once语义吗,官网好像没说,谢谢!





--
Sent from: http://apache-flink.147419.n8.nabble.com/