Re: FLINK-13497 / "Could not create file for checking if truncate works" / HDFS

2019-10-14 Thread Congxian Qiu
Hi

>From the given stack trace, maybe you could solve the "replication problem"
first,   File /okd-dev/3fe6b069-43bf-4d86-9762-4f501c9db16e could only be
replicated to 0 nodes instead of minReplication (=1). There are 2
datanode(s) running and no node(s) are excluded in this operation, and
maybe the answer from SO[1] can help.

[1]
https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025
Best,
Congxian


Adrian Vasiliu  于2019年10月14日周一 下午9:10写道:

> Hello,
>
> We recently upgraded our product from Flink 1.7.2 to Flink 1.9, and we
> experience repeated failing jobs with
>
> java.lang.RuntimeException: Could not create file for checking if
> truncate works. You can disable support for truncate() completely via
> BucketingSink.setUseTruncate(false).
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
> .reflectTruncate(BucketingSink.java:645)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
> .initializeState(BucketingSink.java:388)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> .initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:281)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .initializeState(StreamTask.java:878)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:392)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> File /okd-dev/3fe6b069-43bf-4d86-9762-4f501c9db16e could only be
> replicated to 0 nodes instead of minReplication (=1). There are 2
> datanode(s) running and no node(s) are excluded in this operation.
> at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager
> .chooseTarget4NewBlock(BlockManager.java:1719)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem
> .getNewBlockTargets(FSNamesystem.java:3368)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem
> .getAdditionalBlock(FSNamesystem.java:3292)
> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(
> NameNodeRpcServer.java:850)
> at org.apache.hadoop.hdfs.protocolPB.
> ClientNamenodeProtocolServerSideTranslatorPB.addBlock(
> ClientNamenodeProtocolServerSideTranslatorPB.java:504)
> at org.apache.hadoop.hdfs.protocol.proto.
> ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(
> ClientNamenodeProtocolProtos.java)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker
> .call(ProtobufRpcEngine.java:640)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1866)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
>
> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
> at org.apache.hadoop.ipc.Client.call(Client.java:1435)
> at org.apache.hadoop.ipc.Client.call(Client.java:1345)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(
> ProtobufRpcEngine.java:227)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(
> ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy49.addBlock(Unknown Source)
> at org.apache.hadoop.hdfs.protocolPB.
> ClientNamenodeProtocolTranslatorPB.addBlock(
> ClientNamenodeProtocolTranslatorPB.java:444)
> at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:409)
> at org.apache.hadoop.io.retry.RetryInvocationHandler$Call
> .invokeMethod(RetryInvocationHandler.java:163)
> at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(
> RetryInvocationHandler.java:155)
> at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(
> RetryInvocationHandler.java:95)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:346)
> at com.sun.proxy.$Proxy50.addBlock(Unknown Source)
> 

Re: add() method of AggregateFunction not called even though new watermark is emitted

2019-10-14 Thread Vijay Balakrishnan
Hi,
Thx for the replies - Congxian & Dawdi.
Watermarks are advancing.Not sure how to check every new generated
watermark is reaching end of the window 

I did check the Flink UI for the currentInputWatermark and it is increasing
monotonically.

Narrowed down the problem to not calling the windowStream.aggregate.
I also *added a checkpoint *to see if it was causing the issue.Didn't seem
to help.
Most of the code is reached during the creation of the ExecutionGraph on
the start of the program.

I generate an incrementing sequence of timestamps(delay of 5000ms between
each rec) from a Producer to Kinesis and it emits a new watermark as it
starts receiving the input records.
My window size is 15s.
I see a WindowedStream is created with windowAssigner:
TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
but the *code never gets into the EventTimeTrigger.onElement() or
onEventTime() to fire the trigger*.
It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
I even tried to use ProcessingTime but that also didn't help.


//code to create kinesis consumer successfully..
for (Rule rule : rules.getRules()) {
//gets in here fine
final SingleOutputStreamOperator>
filteredKinesisStream = kinesisStream.filter(mon -> {
boolean result;
String eventName = mon.get(MEASUREMENT) != null ? (String)
mon.get(MEASUREMENT) : "";
InputMetricSelector inputMetricSelector =
rule.getInputMetricSelector();
String measurement = inputMetricSelector != null ?
inputMetricSelector.getMeasurement() : "";
result = eventName.equals(measurement);
if (result) {
Map inputTags = mon.get(TAGS) != null ?
(Map) mon.get(TAGS) : new HashMap<>();
Map ruleTags = inputMetricSelector !=
null ? inputMetricSelector.getTags() : new HashMap<>();
result = matchTags(inputTags, ruleTags);
}
return result;//*<== this is true*
}
).flatMap((FlatMapFunction, Map>)
(input, out) -> {
out.collect(input);//*< runs up till here fine*
}).returns(new TypeHint>() {
});
//*doesn't do anything beyond this point at runtime*
DataStream enrichedMGStream =
pms.createAggregatedMonitoringGroupingWindowStream1
(filteredKinesisStream, ruleFactory, rule, parallelProcess);
enrichedMGStream.addSink(influxSink)
.setParallelism(nbrSinks);
}

private DataStream
createAggregatedMonitoringGroupingWindowStream1(DataStream> kinesisStream, RuleFactory ruleFactory, Rule rule, int
parallelProcess) {
DataStream enrichedComponentInstanceStream1;
RuleConfig ruleConfig = rule.getRuleConfig();
String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
RuleIF ruleImpl = ruleFactory.getRule(ruleType);
Map ruleProps = ruleConfig != null ?
ruleConfig.getRuleProps() : new HashMap<>();
Object intervalObj = ruleProps.get("rule_eval_window");
String timeInterval = intervalObj != null ? (String) intervalObj : "";
org.apache.flink.streaming.api.windowing.time.Time timeWindow =
getTimeWindowFromInterval(timeInterval);

Object windowTypeObj = ruleProps.get("window_type");
String windowType = windowTypeObj != null ? (String) windowTypeObj : "";

InputMetricSelector inputMetricSelector = rule.getInputMetricSelector();
Map tags = inputMetricSelector != null ?
inputMetricSelector.getTags() : new HashMap<>();
String groupByObj = tags.get(GROUP_BY);
String groupBy = groupByObj != null ? groupByObj : "";
kinesisStream = kinesisStream.filter((FilterFunction>) inputMap -> {
Object groupByValueObj = inputMap.get(groupBy);
return groupByValueObj != null;
});
Set groupBySet = new
HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER)));
String metric = Objects.requireNonNull(inputMetricSelector).getMetric();
//till here, it went through fine during creation of ExceutionGraph
KeyedStream, MonitoringTuple>
monitoringTupleKeyedStream =
kinesisStream.keyBy(new MapTupleKeySelector(groupBySet,
metric));*<=== never gets into the MapTupleKeySelector.getKey() - a similar
class works in another project*
enrichedComponentInstanceStream1 =
getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow,
windowType, timeInterval, ruleImpl, rule, parallelProcess);
return enrichedComponentInstanceStream1;
}

private DataStream
getMonitoringGroupDataStream1(KeyedStream,
MonitoringTuple> monitoringTupleKeyedStream,

org.apache.flink.streaming.api.windowing.time.Time timeWindow, String
windowType,
String
interval,
RuleIF
ruleImpl, Rule rule, int parallelProcess) {
long slide = 100;
final WindowedStream, MonitoringTuple, TimeWindow>
windowStream =

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-14 Thread Vijay Bhaskar
Thanks you till. We will try to shift to latest flink version.

Regards
Bhaskar

On Mon, Oct 14, 2019 at 7:43 PM Till Rohrmann  wrote:

> Hi Vijay,
>
> Flink usually writes first the checkpoint data to disk and then writes the
> pointer to the files to ZooKeeper. Hence, if you see a ZooKeeper entry,
> then the files should be there. I assume that there is no other process
> accessing and potentially removing files from the checkpoint directories,
> right?
>
> Have you tried to run one of the latest Flink versions? Flink 1.6.2 is no
> longer actively supported by the community.
>
> Cheers,
> Till
>
> On Fri, Oct 11, 2019 at 11:39 AM Vijay Bhaskar 
> wrote:
>
>> Apart from these we have other environment and there check point worked
>> fine in HA mode with complete cluster restart. But one of the job we are
>> seeing an issue, in zookeeper the check point path is retrieved and its
>> unable to find the check point path in persistent storage. I am wondering
>> why this would happen first of all?
>> Is there any sync issue between file writing over persistent path and
>> file registration with HA service? For example check point has been
>> registered in zookeeper but has not been written yet while restarting the
>> cluster?  I suspect this kind of problem can happen. We are using flink
>> 1.6.2 in production. Is this an issue already known before and fixed
>> recently
>>
>> Regards
>> Bhaskar
>>
>> On Fri, Oct 11, 2019 at 2:08 PM Vijay Bhaskar 
>> wrote:
>>
>>> We are seeing below logs in production sometime ago, after that we
>>> stopped HA. Do you people think HA is enabled properly from the below logs?
>>>
>>> Regards
>>> Bhaskar
>>>
>>> 2019-09-24 17:40:17,675 INFO
>>>  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
>>> Starting ZooKeeperLeaderElectionService
>>> ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
>>> 2019-09-24 17:40:17,675 INFO
>>>  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>>>  - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
>>> 2019-09-24 17:40:20,975 WARN  akka.remote.transport.netty.NettyTransport
>>>- Remote connection to [null] failed with
>>> java.net.NoRouteToHostException: No route to host
>>> 2019-09-24 17:40:20,976 WARN  akka.remote.ReliableDeliverySupervisor
>>>- Association with remote system [akka.tcp:
>>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp:
>>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>>> Caused by: [No route to host]
>>> 2019-09-24 17:40:23,976 WARN  akka.remote.transport.netty.NettyTransport
>>>- Remote connection to [null] failed with
>>> java.net.NoRouteToHostException: No route to host
>>> 2019-09-24 17:40:23,977 WARN  akka.remote.ReliableDeliverySupervisor
>>>- Association with remote system [akka.tcp:
>>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp:
>>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>>> Caused by: [No route to host]
>>> 2019-09-24 17:40:26,982 WARN  akka.remote.transport.netty.NettyTransport
>>>- Remote connection to [null] failed with
>>> java.net.NoRouteToHostException: No route to host
>>> 2019-09-24 17:40:26,983 WARN  akka.remote.ReliableDeliverySupervisor
>>>- Association with remote system [akka.tcp:
>>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp:
>>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>>> Caused by: [No route to host]
>>> 2019-09-24 17:40:29,988 WARN  akka.remote.transport.netty.NettyTransport
>>>- Remote connection to [null] failed with
>>> java.net.NoRouteToHostException: No route to host
>>> 2019-09-24 17:40:29,988 WARN  akka.remote.ReliableDeliverySupervisor
>>>- Association with remote system [akka.tcp:
>>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp:
>>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>>> Caused by: [No route to host]
>>> 2019-09-24 17:40:32,994 WARN  akka.remote.transport.netty.NettyTransport
>>>- Remote connection to [null] failed with
>>> java.net.NoRouteToHostException: No route to host
>>> 2019-09-24 17:40:32,995 WARN  akka.remote.ReliableDeliverySupervisor
>>>- Association with remote system [akka.tcp:
>>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>>> has failed, 

group.id更改,通过savepoint启动的Flink任务,Kafka consumer是否仍然可以获取到保存在状态中的start position?

2019-10-14 Thread justskinny
Hi,all根据文档,如果从checkpoint或者savepoint中恢复任务,则Kafka Consumer会使用状态中的start 
position。
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
请问通过savepoint恢复的任务,如果group.id更改,Kafka consumer是否仍然可以获取到保存在状态中的start 
position?

Re: [flink sql] table in subquery join temporal table raise java.lang.NullPointerException

2019-10-14 Thread Till Rohrmann
Thanks for reporting this issue. I've pulled in Jark and Kurt who might
help you with this problem.

Cheers,
Till

On Sat, Oct 12, 2019 at 5:42 PM hzp  wrote:

> Hi all,
>
> I'm using flink sql to join a temporal table in a subquery, but it raises
> java.lang.NullPointerException when execute.
>
> Orders is a table source, and Rates is a temporal table
>
> Here are my sqls:
> // works
> SELECT o_amount * r_amount AS amount
> FROM Orders, LATERAL TABLE (Rates(o_proctime))
> WHERE r_currency = o_currency
>
> // sql raise exception
> SELECT o_amount * r_amount AS amount
> FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime))
> WHERE r_currency = o_currency
>
> The error stack:
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167)
> at
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
> at
> 

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-14 Thread Till Rohrmann
Hi Vijay,

Flink usually writes first the checkpoint data to disk and then writes the
pointer to the files to ZooKeeper. Hence, if you see a ZooKeeper entry,
then the files should be there. I assume that there is no other process
accessing and potentially removing files from the checkpoint directories,
right?

Have you tried to run one of the latest Flink versions? Flink 1.6.2 is no
longer actively supported by the community.

Cheers,
Till

On Fri, Oct 11, 2019 at 11:39 AM Vijay Bhaskar 
wrote:

> Apart from these we have other environment and there check point worked
> fine in HA mode with complete cluster restart. But one of the job we are
> seeing an issue, in zookeeper the check point path is retrieved and its
> unable to find the check point path in persistent storage. I am wondering
> why this would happen first of all?
> Is there any sync issue between file writing over persistent path and file
> registration with HA service? For example check point has been registered
> in zookeeper but has not been written yet while restarting the cluster?  I
> suspect this kind of problem can happen. We are using flink 1.6.2 in
> production. Is this an issue already known before and fixed recently
>
> Regards
> Bhaskar
>
> On Fri, Oct 11, 2019 at 2:08 PM Vijay Bhaskar 
> wrote:
>
>> We are seeing below logs in production sometime ago, after that we
>> stopped HA. Do you people think HA is enabled properly from the below logs?
>>
>> Regards
>> Bhaskar
>>
>> 2019-09-24 17:40:17,675 INFO
>>  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
>> Starting ZooKeeperLeaderElectionService
>> ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
>> 2019-09-24 17:40:17,675 INFO
>>  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>>  - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
>> 2019-09-24 17:40:20,975 WARN  akka.remote.transport.netty.NettyTransport
>>- Remote connection to [null] failed with
>> java.net.NoRouteToHostException: No route to host
>> 2019-09-24 17:40:20,976 WARN  akka.remote.ReliableDeliverySupervisor
>>- Association with remote system [akka.tcp:
>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>> has failed, address is now gated for [50] ms. Reason: [Association failed
>> with [akka.tcp:
>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>> Caused by: [No route to host]
>> 2019-09-24 17:40:23,976 WARN  akka.remote.transport.netty.NettyTransport
>>- Remote connection to [null] failed with
>> java.net.NoRouteToHostException: No route to host
>> 2019-09-24 17:40:23,977 WARN  akka.remote.ReliableDeliverySupervisor
>>- Association with remote system [akka.tcp:
>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>> has failed, address is now gated for [50] ms. Reason: [Association failed
>> with [akka.tcp:
>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>> Caused by: [No route to host]
>> 2019-09-24 17:40:26,982 WARN  akka.remote.transport.netty.NettyTransport
>>- Remote connection to [null] failed with
>> java.net.NoRouteToHostException: No route to host
>> 2019-09-24 17:40:26,983 WARN  akka.remote.ReliableDeliverySupervisor
>>- Association with remote system [akka.tcp:
>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>> has failed, address is now gated for [50] ms. Reason: [Association failed
>> with [akka.tcp:
>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>> Caused by: [No route to host]
>> 2019-09-24 17:40:29,988 WARN  akka.remote.transport.netty.NettyTransport
>>- Remote connection to [null] failed with
>> java.net.NoRouteToHostException: No route to host
>> 2019-09-24 17:40:29,988 WARN  akka.remote.ReliableDeliverySupervisor
>>- Association with remote system [akka.tcp:
>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>> has failed, address is now gated for [50] ms. Reason: [Association failed
>> with [akka.tcp:
>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>> Caused by: [No route to host]
>> 2019-09-24 17:40:32,994 WARN  akka.remote.transport.netty.NettyTransport
>>- Remote connection to [null] failed with
>> java.net.NoRouteToHostException: No route to host
>> 2019-09-24 17:40:32,995 WARN  akka.remote.ReliableDeliverySupervisor
>>- Association with remote system [akka.tcp:
>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>> has failed, address is now gated for [50] ms. Reason: [Association failed
>> with [akka.tcp:
>> //fl...@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>> Caused by: [No route to host]
>> 2019-09-24 17:40:36,000 WARN  

FLINK-13497 / "Could not create file for checking if truncate works" / HDFS

2019-10-14 Thread Adrian Vasiliu
Hello, 
 
We recently upgraded our product from Flink 1.7.2 to Flink 1.9, and we experience repeated failing jobs with  
java.lang.RuntimeException: Could not create file for checking if truncate works. You can disable support for truncate() completely via BucketingSink.setUseTruncate(false).
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:645)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:388)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /okd-dev/3fe6b069-43bf-4d86-9762-4f501c9db16e could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and no node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1719)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3368)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3292)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:850)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:504)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347) 

at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
at org.apache.hadoop.ipc.Client.call(Client.java:1435)
at org.apache.hadoop.ipc.Client.call(Client.java:1345)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy49.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:444)
at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
at com.sun.proxy.$Proxy50.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1838)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1638)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:704)
 
Reading through https://issues.apache.org/jira/browse/FLINK-13593 , it looks related but this is marked as fixed in 1.9.
 
Then, the discussion there points to https://issues.apache.org/jira/browse/FLINK-13497 which is marked as unresolved / fixed in 1.10.Any lights about:1/ Would you confirm that our stack trace is related with  https://issues.apache.org/jira/browse/FLINK-13497  ?2/ Any ETA for a 1.9.x fixing it? 
Thanks
Adrian Vasiliu



Re: Batch Job in a Flink 1.9 Standalone Cluster

2019-10-14 Thread Timothy Victor
Thanks for the insight Roman, and also for the GC tips.  There are 2
reasons why I wanted to see this memory released.  First as a way to just
confirm my understanding of Flink memory segment handling.   Second is that
I run a single standalone cluster that runs both streaming and batch jobs,
and thus cluster was being killed by OoM killer (i.e. java runtime was
killed, not jvm exception).

For the second part, I did some napkin calculations and tuned down the
number of TMs on the host.  Thus seems to help a but since before what was
happening was subsequent batch jobs were being scheduled on fresh TMs which
had not allocated memory before.  So as more TMs did work more memory was
used but never released and subsequently the OS oomkiller stepped in.

My direction now (thanks to all I learned and the input in this thread) is
to

a)  Not run Streaming and Batch jobs on the same cluster.  Their memory
models are different enough that this is not a good thing  and I certainly
don't want a streaming job to be impacted due to the running of a batch job.

b) Move the batch jobs to a Job Cluster setup running in K8s.  I have had a
lot of trouble getting this to run stability due to K8s issues, but I am
very close now I think.

Thanks again

Tim

On Mon, Oct 14, 2019, 3:08 AM Roman Grebennikov  wrote:

> Forced GC does not mean that JVM will even try to release the freed memory
> back to the operating system. This highly depends on the JVM and garbage
> collector used for your Flink setup, but most probably it's the jvm8 with
> the ParallelGC collector.
>
> ParallelGC is known to be not that aggressive on releasing free heap
> memory back to OS. I see here multiple different solutions:
> 1. Question yourself why do you really need to release any memory back? Is
> there a logical reason behind it? As next time you resubmit the job, the
> memory is going to be reused.
> 2. You can switch to G1GC and use JVM args like "-XX:MaxHeapFreeRatio
> -XX:MinHeapFreeRatio" to make it more aggressive on releasing memory.
> 3. You can use unofficial JVM builds from RedHat with ShenandoahGC
> backport, which is also able to do the job:
> https://builds.shipilev.net/openjdk-shenandoah-jdk8/
> 3. Flink 1.10 (hopefully) will be able to run on jvm11, so G1 on it is
> much more aggressive on releasing memory:
> https://bugs.openjdk.java.net/browse/JDK-8146436
>
> Roman Grebennikov | g...@dfdx.me
>
>
> On Sat, Oct 12, 2019, at 08:38, Timothy Victor wrote:
>
> This part about the GC not cleaning up after the job finishes makes
> sense.   However, I o served that even after I run a "jcmd  GC.run" on
> the task manager process ID the memory is still not released.  This is what
> concerns me.
>
> Tim
>
>
> On Sat, Oct 12, 2019, 2:53 AM Xintong Song  wrote:
>
> Generally yes, with one slight difference.
>
> Once the job is done, the buffer is released by flink task manager
> (because pre-allocation is configured to be disabled), but the
> corresponding memory may not be released by jvm (because no GC cleans it).
> So it's not the task manager that keeps the buffer to be used for the next
> batch job. When the new batch job is running, the task executor allocates
> new buffers, which will use the memory of the previous buffer that jvm
> haven't released.
>
> Thank you~
>
> Xintong Song
>
>
>
>
> On Sat, Oct 12, 2019 at 7:28 AM Timothy Victor  wrote:
>
> Thanks Xintong!   In my case both of those parameters are set to false
> (default).  I think I am sort of following what's happening here.
>
> I have one TM with heap size set to 1GB.  When the cluster is started the
> TM doesn't use that 1GB (no allocations).  Once the first batch job is
> submitted I can see the memory roughly go up by 1GB.   I presume this is
> when TM allocates its 1GB on the heap, and if I read correctly this is
> essentially a large byte buffer that is tenured so that it is never GCed.
> Flink writes any pojos (serializes) to this byte buffer and this is to
> essentially circumvent GC for performance.   Once the job is done, this
> byte buffer remains on the heap, and the task manager keeps it to use for
> the next batch job.  This is why I never see the memory go down after a
> batch job is complete.
>
> Does this make sense?  Please let me know what you think.
>
> Thanks
>
> Tim
>
> On Thu, Oct 10, 2019, 11:16 PM Xintong Song  wrote:
>
> I think it depends on your configurations.
> - Are you using on-heap/off-heap managed memory? (configured by
> 'taskmanager.memory.off-heap', by default is false)
>
> - Is managed memory pre-allocated? (configured by
> 'taskmanager.memory.preallocate', by default is ffalse)
>
>
> If managed memory is pre-allocated, then the allocated memory segments
> will never be released. If it's not pre-allocated, memory segments should
> be released when the task is finished, but the actual memory will not be
> de-allocated until next GC. Since the job is finished, there may not be
> enough heap activities to trigger the GC. If on-heap memory is 

Re: [PROPOSAL] Contribute Stateful Functions to Apache Flink

2019-10-14 Thread Stephan Ewen
Thank you all for the encouraging feedback! So far the reaction to add this
to Flink was exclusively positive, which is really great to see!

To make this happen, here would be the next steps:

(1) As per the bylaws, a contribution like that would need a PMC vote,
because it is a commitment to take this and shepherd
it in the future. I will kick that off next.

(2) The biggest open question in the current discussion would be whether to
go with a separate repository, or put it into Flink core.
Related to the repository discussion is also how to link and present this
on the Flink website.
I will spin off a separate discussion for that, to keep the threads focused.

Best,
Stephan


On Mon, Oct 14, 2019 at 10:16 AM Becket Qin  wrote:

> +1 to adding Stateful Function to Flink. It is a very useful addition to
> the Flink ecosystem.
>
> Given this is essentially a new top-level / first-citizen API of Flink, it
> seems better to have it the Flink core repo. This will also avoid letting
> this important new API to be blocked on potential problems of maintaining
> multiple different repositories.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sun, Oct 13, 2019 at 4:48 AM Hequn Cheng  wrote:
>
>> Hi Stephan,
>>
>> Big +1 for adding this to Apache Flink!
>>
>> As for the problem of whether this should be added to the Flink main
>> repository, from my side, I prefer to put it in the main repository. Not
>> only Stateful Functions shares very close relations with the current Flink,
>> but also other libs or modules in Flink can make use of it the other way
>> round in the future. At that time the Flink API stack would also be changed
>> a bit and this would be cool.
>>
>> Best, Hequn
>>
>> On Sat, Oct 12, 2019 at 9:16 PM Biao Liu  wrote:
>>
>>> Hi Stehpan,
>>>
>>> +1 for having Stateful Functions in Flink.
>>>
>>> Before discussing which repository it should belong, I was wondering if
>>> we have reached an agreement of "splitting flink repository" as Piotr
>>> mentioned or not. It seems that it's just no more further discussion.
>>> It's OK for me to add it to core repository. After all almost everything
>>> is in core repository now. But if we decide to split the core repository
>>> someday, I tend to create a separate repository for Stateful Functions. It
>>> might be good time to take the first step of splitting.
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Sat, 12 Oct 2019 at 19:31, Yu Li  wrote:
>>>
 Hi Stephan,

 Big +1 for adding stateful functions to Flink. I believe a lot of user
 would be interested to try this out and I could imagine how this could
 contribute to reduce the TCO for business requiring both streaming
 processing and stateful functions.

 And my 2 cents is to put it into flink core repository since I could
 see a tight connection between this library and flink state.

 Best Regards,
 Yu


 On Sat, 12 Oct 2019 at 17:31, jincheng sun 
 wrote:

> Hi Stephan,
>
> bit +1 for adding this great features to Apache Flink.
>
> Regarding where we should place it, put it into Flink core repository
> or create a separate repository? I prefer put it into main repository and
> looking forward the more detail discussion for this decision.
>
> Best,
> Jincheng
>
>
> Jingsong Li  于2019年10月12日周六 上午11:32写道:
>
>> Hi Stephan,
>>
>> big +1 for this contribution. It provides another user interface that
>> is easy to use and popular at this time. these functions, It's hard for
>> users to write in SQL/TableApi, while using DataStream is too complex.
>> (We've done some stateFun kind jobs using DataStream before). With
>> statefun, it is very easy.
>>
>> I think it's also a good opportunity to exercise Flink's core
>> capabilities. I looked at stateful-functions-flink briefly, it is very
>> interesting. I think there are many other things Flink can improve. So I
>> think it's a better thing to put it into Flink, and the improvement for 
>> it
>> will be more natural in the future.
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Oct 11, 2019 at 7:33 PM Dawid Wysakowicz <
>> dwysakow...@apache.org> wrote:
>>
>>> Hi Stephan,
>>>
>>> I think this is a nice library, but what I like more about it is
>>> that it suggests exploring different use-cases. I think it definitely 
>>> makes
>>> sense for the Flink community to explore more lightweight applications 
>>> that
>>> reuses resources. Therefore I definitely think it is a good idea for 
>>> Flink
>>> community to accept this contribution and help maintaining it.
>>>
>>> Personally I'd prefer to have it in a separate repository. There
>>> were a few discussions before where different people were suggesting to
>>> extract connectors and other libraries to separate repositories. 
>>> Moreover I
>>> think it 

Re: [PROPOSAL] Contribute Stateful Functions to Apache Flink

2019-10-14 Thread Zili Chen
+1 to add Stateful Function to FLINK core repository.

Best,
tison.


Becket Qin  于2019年10月14日周一 下午4:16写道:

> +1 to adding Stateful Function to Flink. It is a very useful addition to
> the Flink ecosystem.
>
> Given this is essentially a new top-level / first-citizen API of Flink, it
> seems better to have it the Flink core repo. This will also avoid letting
> this important new API to be blocked on potential problems of maintaining
> multiple different repositories.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sun, Oct 13, 2019 at 4:48 AM Hequn Cheng  wrote:
>
>> Hi Stephan,
>>
>> Big +1 for adding this to Apache Flink!
>>
>> As for the problem of whether this should be added to the Flink main
>> repository, from my side, I prefer to put it in the main repository. Not
>> only Stateful Functions shares very close relations with the current Flink,
>> but also other libs or modules in Flink can make use of it the other way
>> round in the future. At that time the Flink API stack would also be changed
>> a bit and this would be cool.
>>
>> Best, Hequn
>>
>> On Sat, Oct 12, 2019 at 9:16 PM Biao Liu  wrote:
>>
>>> Hi Stehpan,
>>>
>>> +1 for having Stateful Functions in Flink.
>>>
>>> Before discussing which repository it should belong, I was wondering if
>>> we have reached an agreement of "splitting flink repository" as Piotr
>>> mentioned or not. It seems that it's just no more further discussion.
>>> It's OK for me to add it to core repository. After all almost everything
>>> is in core repository now. But if we decide to split the core repository
>>> someday, I tend to create a separate repository for Stateful Functions. It
>>> might be good time to take the first step of splitting.
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Sat, 12 Oct 2019 at 19:31, Yu Li  wrote:
>>>
 Hi Stephan,

 Big +1 for adding stateful functions to Flink. I believe a lot of user
 would be interested to try this out and I could imagine how this could
 contribute to reduce the TCO for business requiring both streaming
 processing and stateful functions.

 And my 2 cents is to put it into flink core repository since I could
 see a tight connection between this library and flink state.

 Best Regards,
 Yu


 On Sat, 12 Oct 2019 at 17:31, jincheng sun 
 wrote:

> Hi Stephan,
>
> bit +1 for adding this great features to Apache Flink.
>
> Regarding where we should place it, put it into Flink core repository
> or create a separate repository? I prefer put it into main repository and
> looking forward the more detail discussion for this decision.
>
> Best,
> Jincheng
>
>
> Jingsong Li  于2019年10月12日周六 上午11:32写道:
>
>> Hi Stephan,
>>
>> big +1 for this contribution. It provides another user interface that
>> is easy to use and popular at this time. these functions, It's hard for
>> users to write in SQL/TableApi, while using DataStream is too complex.
>> (We've done some stateFun kind jobs using DataStream before). With
>> statefun, it is very easy.
>>
>> I think it's also a good opportunity to exercise Flink's core
>> capabilities. I looked at stateful-functions-flink briefly, it is very
>> interesting. I think there are many other things Flink can improve. So I
>> think it's a better thing to put it into Flink, and the improvement for 
>> it
>> will be more natural in the future.
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Oct 11, 2019 at 7:33 PM Dawid Wysakowicz <
>> dwysakow...@apache.org> wrote:
>>
>>> Hi Stephan,
>>>
>>> I think this is a nice library, but what I like more about it is
>>> that it suggests exploring different use-cases. I think it definitely 
>>> makes
>>> sense for the Flink community to explore more lightweight applications 
>>> that
>>> reuses resources. Therefore I definitely think it is a good idea for 
>>> Flink
>>> community to accept this contribution and help maintaining it.
>>>
>>> Personally I'd prefer to have it in a separate repository. There
>>> were a few discussions before where different people were suggesting to
>>> extract connectors and other libraries to separate repositories. 
>>> Moreover I
>>> think it could serve as an example for the Flink ecosystem website[1]. 
>>> This
>>> could be the first project in there and give a good impression that the
>>> community sees potential in the ecosystem website.
>>>
>>> Lastly, I'm wondering if this should go through PMC vote according
>>> to our bylaws[2]. In the end the suggestion is to adopt an existing code
>>> base as is. It also proposes a new programs concept that could result 
>>> in a
>>> shift of priorities for the community in a long run.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1]
>>> 

Re: 【flink sql】table in subquery join temporal table raise java.lang.NullPointerException

2019-10-14 Thread 兆鹏 黄
Hi, 
Thanks for reply. They are the same issue.



> 在 2019年10月14日,下午5:17,Benoît Paris  写道:
> 
> https://issues.apache.org/jira/browse/FLINK-14200 
> 


Re: 【flink sql】table in subquery join temporal table raise java.lang.NullPointerException

2019-10-14 Thread Benoît Paris
This seems to be related to:

https://issues.apache.org/jira/browse/FLINK-14200  (Temporal Table Function
Joins do not work on Tables (only TableSources) on the query side)



On Sat, Oct 12, 2019 at 5:56 PM hzp  wrote:

> Hi all,
>
> I'm using flink sql to join a temporal table in a subquery, but it
> raises java.lang.NullPointerException when execute.
>
> Orders is a table source, and Rates is a temporal table
>
> Here are my sqls:
> // works
> SELECT o_amount * r_amount AS amount
> FROM Orders, LATERAL TABLE (Rates(o_proctime))
> WHERE r_currency = o_currency
>
> // sql raise exception
> SELECT o_amount * r_amount AS amount
> FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime))
> WHERE r_currency = o_currency
>
> The error stack:
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167)
> at
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
> at
> 

kafka-json row_time register

2019-10-14 Thread as333vvvvv
my data come from kafka and json formated。
I register source table with row_time using timestamps_from_field (python
api with flink-190).
As api described,timestamps_from_field can handle long and timestamp.
but json_schema do not contain long and in original data there are no
timestamp data-type 
so what should I do  for solving this problem




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: StreamingFileSink

2019-10-14 Thread Roman Grebennikov
As for StreamingFileSink and compressed output, see the 
StreamingFileSink.forBulkFormat and BulkWriter.Factory classes. Simple example 
(using apache commons-io and commons-compress):

 val writer = new BulkWriter.Factory[String] {
 override def create(out: FSDataOutputStream): BulkWriter[String] = new 
BulkWriter[String] {
 val compressed = new GzipCompressorOutputStream(out)
 override def addElement(element: String): Unit = 
compressed.write(element.getBytes())
 override def flush(): Unit = compressed.flush()
 override def finish(): Unit = compressed.close()
 }
 }
 val sink = StreamingFileSink.forBulkFormat[String](new Path("/some/path"), 
writer)

There are still some usability issues with StreamingFileSink (like not being 
able to customize the resulting file names), but they are already going to be 
fixed in Flink 1.10.

Roman Grebennikov | g...@dfdx.me


On Fri, Oct 11, 2019, at 23:07, John O wrote:
> Hello,

> 

> Question 1

> I don’t see any reference material showing how to write compressed (gzip) 
> files with StreamingFileSink. Can someone point me in the right direction?

> 

> Question 2

> We currently have a use case for a “StreamingFileProcessFunction”. Basically 
> we need an output for the StreamingFileSink that will be used by a downstream 
> processor. What would be the best way to implement this feature?

> 

> 

> Best,

> Song



Re: [PROPOSAL] Contribute Stateful Functions to Apache Flink

2019-10-14 Thread Becket Qin
+1 to adding Stateful Function to Flink. It is a very useful addition to
the Flink ecosystem.

Given this is essentially a new top-level / first-citizen API of Flink, it
seems better to have it the Flink core repo. This will also avoid letting
this important new API to be blocked on potential problems of maintaining
multiple different repositories.

Thanks,

Jiangjie (Becket) Qin

On Sun, Oct 13, 2019 at 4:48 AM Hequn Cheng  wrote:

> Hi Stephan,
>
> Big +1 for adding this to Apache Flink!
>
> As for the problem of whether this should be added to the Flink main
> repository, from my side, I prefer to put it in the main repository. Not
> only Stateful Functions shares very close relations with the current Flink,
> but also other libs or modules in Flink can make use of it the other way
> round in the future. At that time the Flink API stack would also be changed
> a bit and this would be cool.
>
> Best, Hequn
>
> On Sat, Oct 12, 2019 at 9:16 PM Biao Liu  wrote:
>
>> Hi Stehpan,
>>
>> +1 for having Stateful Functions in Flink.
>>
>> Before discussing which repository it should belong, I was wondering if
>> we have reached an agreement of "splitting flink repository" as Piotr
>> mentioned or not. It seems that it's just no more further discussion.
>> It's OK for me to add it to core repository. After all almost everything
>> is in core repository now. But if we decide to split the core repository
>> someday, I tend to create a separate repository for Stateful Functions. It
>> might be good time to take the first step of splitting.
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Sat, 12 Oct 2019 at 19:31, Yu Li  wrote:
>>
>>> Hi Stephan,
>>>
>>> Big +1 for adding stateful functions to Flink. I believe a lot of user
>>> would be interested to try this out and I could imagine how this could
>>> contribute to reduce the TCO for business requiring both streaming
>>> processing and stateful functions.
>>>
>>> And my 2 cents is to put it into flink core repository since I could see
>>> a tight connection between this library and flink state.
>>>
>>> Best Regards,
>>> Yu
>>>
>>>
>>> On Sat, 12 Oct 2019 at 17:31, jincheng sun 
>>> wrote:
>>>
 Hi Stephan,

 bit +1 for adding this great features to Apache Flink.

 Regarding where we should place it, put it into Flink core repository
 or create a separate repository? I prefer put it into main repository and
 looking forward the more detail discussion for this decision.

 Best,
 Jincheng


 Jingsong Li  于2019年10月12日周六 上午11:32写道:

> Hi Stephan,
>
> big +1 for this contribution. It provides another user interface that
> is easy to use and popular at this time. these functions, It's hard for
> users to write in SQL/TableApi, while using DataStream is too complex.
> (We've done some stateFun kind jobs using DataStream before). With
> statefun, it is very easy.
>
> I think it's also a good opportunity to exercise Flink's core
> capabilities. I looked at stateful-functions-flink briefly, it is very
> interesting. I think there are many other things Flink can improve. So I
> think it's a better thing to put it into Flink, and the improvement for it
> will be more natural in the future.
>
> Best,
> Jingsong Lee
>
> On Fri, Oct 11, 2019 at 7:33 PM Dawid Wysakowicz <
> dwysakow...@apache.org> wrote:
>
>> Hi Stephan,
>>
>> I think this is a nice library, but what I like more about it is that
>> it suggests exploring different use-cases. I think it definitely makes
>> sense for the Flink community to explore more lightweight applications 
>> that
>> reuses resources. Therefore I definitely think it is a good idea for 
>> Flink
>> community to accept this contribution and help maintaining it.
>>
>> Personally I'd prefer to have it in a separate repository. There were
>> a few discussions before where different people were suggesting to 
>> extract
>> connectors and other libraries to separate repositories. Moreover I think
>> it could serve as an example for the Flink ecosystem website[1]. This 
>> could
>> be the first project in there and give a good impression that the 
>> community
>> sees potential in the ecosystem website.
>>
>> Lastly, I'm wondering if this should go through PMC vote according to
>> our bylaws[2]. In the end the suggestion is to adopt an existing code 
>> base
>> as is. It also proposes a new programs concept that could result in a 
>> shift
>> of priorities for the community in a long run.
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>>
>> [2] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws
>> On 11/10/2019 13:12, Till Rohrmann wrote:
>>
>> Hi Stephan,

Re: Batch Job in a Flink 1.9 Standalone Cluster

2019-10-14 Thread Roman Grebennikov
Forced GC does not mean that JVM will even try to release the freed memory back 
to the operating system. This highly depends on the JVM and garbage collector 
used for your Flink setup, but most probably it's the jvm8 with the ParallelGC 
collector.

ParallelGC is known to be not that aggressive on releasing free heap memory 
back to OS. I see here multiple different solutions:
1. Question yourself why do you really need to release any memory back? Is 
there a logical reason behind it? As next time you resubmit the job, the memory 
is going to be reused.
2. You can switch to G1GC and use JVM args like "-XX:MaxHeapFreeRatio 
-XX:MinHeapFreeRatio" to make it more aggressive on releasing memory.
3. You can use unofficial JVM builds from RedHat with ShenandoahGC backport, 
which is also able to do the job: 
https://builds.shipilev.net/openjdk-shenandoah-jdk8/
3. Flink 1.10 (hopefully) will be able to run on jvm11, so G1 on it is much 
more aggressive on releasing memory: 
https://bugs.openjdk.java.net/browse/JDK-8146436

Roman Grebennikov | g...@dfdx.me


On Sat, Oct 12, 2019, at 08:38, Timothy Victor wrote:
> This part about the GC not cleaning up after the job finishes makes sense. 
> However, I o served that even after I run a "jcmd  GC.run" on the task 
> manager process ID the memory is still not released. This is what concerns me.
> 
> Tim
> 
> 
> On Sat, Oct 12, 2019, 2:53 AM Xintong Song  wrote:
>> Generally yes, with one slight difference. 
>> 
>> Once the job is done, the buffer is released by flink task manager (because 
>> pre-allocation is configured to be disabled), but the corresponding memory 
>> may not be released by jvm (because no GC cleans it). So it's not the task 
>> manager that keeps the buffer to be used for the next batch job. When the 
>> new batch job is running, the task executor allocates new buffers, which 
>> will use the memory of the previous buffer that jvm haven't released.
>> 
>> Thank you~

>> Xintong Song

>> 

>> 
>> 
>> On Sat, Oct 12, 2019 at 7:28 AM Timothy Victor  wrote:
>>> Thanks Xintong! In my case both of those parameters are set to false 
>>> (default). I think I am sort of following what's happening here.
>>> 
>>> I have one TM with heap size set to 1GB. When the cluster is started the TM 
>>> doesn't use that 1GB (no allocations). Once the first batch job is 
>>> submitted I can see the memory roughly go up by 1GB. I presume this is when 
>>> TM allocates its 1GB on the heap, and if I read correctly this is 
>>> essentially a large byte buffer that is tenured so that it is never GCed. 
>>> Flink writes any pojos (serializes) to this byte buffer and this is to 
>>> essentially circumvent GC for performance. Once the job is done, this byte 
>>> buffer remains on the heap, and the task manager keeps it to use for the 
>>> next batch job. This is why I never see the memory go down after a batch 
>>> job is complete. 
>>> 
>>> Does this make sense? Please let me know what you think.
>>> 
>>> Thanks
>>> 
>>> Tim
>>> 
>>> On Thu, Oct 10, 2019, 11:16 PM Xintong Song  wrote:
 I think it depends on your configurations.
 - Are you using on-heap/off-heap managed memory? (configured by 
 'taskmanager.memory.off-heap', by default is false)
 - Is managed memory pre-allocated? (configured by 
 'taskmanager.memory.preallocate', by default is ffalse)

 

 If managed memory is pre-allocated, then the allocated memory segments 
 will never be released. If it's not pre-allocated, memory segments should 
 be released when the task is finished, but the actual memory will not be 
 de-allocated until next GC. Since the job is finished, there may not be 
 enough heap activities to trigger the GC. If on-heap memory is used, you 
 may not be able to observe the decreasing of TM memory usage, because JVM 
 heap size does not scale down. Only if off-heap memory is used, you might 
 be able to observe the decreasing of TM memory usage after a GC, but not 
 from a jmap dump because jmap dumps heap memory usage only.

 

 Besides, I don't think you need to worry about whether memory is released 
 after one job is finished. Sometimes flink/jvm do not release memory after 
 jobs/tasks finished, so that it can be reused directly by other 
 jobs/tasks, for the purpose of reducing allocate/deallocated overheads and 
 optimizing performance.

 

 Thank you~

 Xintong Song

 

 
 
 On Thu, Oct 10, 2019 at 7:55 PM Timothy Victor  wrote:
> After a batch job finishes in a flink standalone cluster, I notice that 
> the memory isn't freed up. I understand Flink uses it's own memory 
> manager and just allocates a large tenured byte array that is not GC'ed. 
> But does the memory used in this byte array get released when the batch 
> job is done?
> 
> The scenario I am facing is that I am running a series of scheduled batch 
> jobs on a standalone cluster with 

Re: [PROPOSAL] Contribute Stateful Functions to Apache Flink

2019-10-14 Thread Kostas Kloudas
Hi all,

Big +1 for contributing Stateful Functions to Flink and as for the
main question at hand, I would vote for putting it in the main
repository.

I understand that this can couple the release cadence of Flink and
Stateful Functions although I think the pros of having a "you break
it,
you fix it" policy outperform the cons of tying the release cadences.

Looking forward to the integration and the new usecases it may bring!

Cheers,
Kostas

On Mon, Oct 14, 2019 at 9:35 AM Dian Fu  wrote:
>
> Hi Stephan,
>
> Big +1 for adding stateful functions to Apache Flink! The use cases unlocked 
> with this feature are very interesting and promising.
>
> Regarding to whether to place it into Flink core repository, personally I 
> perfer to put it in the main repository. This feature introduces a new set of 
> APIs and it will support a new set of applications. It enriches the API stack 
> of Apache Flink. This is somewhat simlar to the Table API & SQL, State 
> Processor API, CEP library, etc. If the applications supported by this 
> feature are important enough for Flink, it's more appropriate to put it 
> directly into the main repository.
>
> Regards,
> Dian
>
> > 在 2019年10月13日,上午10:47,Hequn Cheng  写道:
> >
> > Hi Stephan,
> >
> > Big +1 for adding this to Apache Flink!
> >
> > As for the problem of whether this should be added to the Flink main 
> > repository, from my side, I prefer to put it in the main repository. Not 
> > only Stateful Functions shares very close relations with the current Flink, 
> > but also other libs or modules in Flink can make use of it the other way 
> > round in the future. At that time the Flink API stack would also be changed 
> > a bit and this would be cool.
> >
> > Best, Hequn
> >
> > On Sat, Oct 12, 2019 at 9:16 PM Biao Liu  > > wrote:
> > Hi Stehpan,
> >
> > +1 for having Stateful Functions in Flink.
> >
> > Before discussing which repository it should belong, I was wondering if we 
> > have reached an agreement of "splitting flink repository" as Piotr 
> > mentioned or not. It seems that it's just no more further discussion.
> > It's OK for me to add it to core repository. After all almost everything is 
> > in core repository now. But if we decide to split the core repository 
> > someday, I tend to create a separate repository for Stateful Functions. It 
> > might be good time to take the first step of splitting.
> >
> > Thanks,
> > Biao /'bɪ.aʊ/
> >
> >
> >
> > On Sat, 12 Oct 2019 at 19:31, Yu Li  > > wrote:
> > Hi Stephan,
> >
> > Big +1 for adding stateful functions to Flink. I believe a lot of user 
> > would be interested to try this out and I could imagine how this could 
> > contribute to reduce the TCO for business requiring both streaming 
> > processing and stateful functions.
> >
> > And my 2 cents is to put it into flink core repository since I could see a 
> > tight connection between this library and flink state.
> >
> > Best Regards,
> > Yu
> >
> >
> > On Sat, 12 Oct 2019 at 17:31, jincheng sun  > > wrote:
> > Hi Stephan,
> >
> > bit +1 for adding this great features to Apache Flink.
> >
> > Regarding where we should place it, put it into Flink core repository or 
> > create a separate repository? I prefer put it into main repository and 
> > looking forward the more detail discussion for this decision.
> >
> > Best,
> > Jincheng
> >
> >
> > Jingsong Li mailto:jingsongl...@gmail.com>> 
> > 于2019年10月12日周六 上午11:32写道:
> > Hi Stephan,
> >
> > big +1 for this contribution. It provides another user interface that is 
> > easy to use and popular at this time. these functions, It's hard for users 
> > to write in SQL/TableApi, while using DataStream is too complex. (We've 
> > done some stateFun kind jobs using DataStream before). With statefun, it is 
> > very easy.
> >
> > I think it's also a good opportunity to exercise Flink's core capabilities. 
> > I looked at stateful-functions-flink briefly, it is very interesting. I 
> > think there are many other things Flink can improve. So I think it's a 
> > better thing to put it into Flink, and the improvement for it will be more 
> > natural in the future.
> >
> > Best,
> > Jingsong Lee
> >
> > On Fri, Oct 11, 2019 at 7:33 PM Dawid Wysakowicz  > > wrote:
> > Hi Stephan,
> >
> > I think this is a nice library, but what I like more about it is that it 
> > suggests exploring different use-cases. I think it definitely makes sense 
> > for the Flink community to explore more lightweight applications that 
> > reuses resources. Therefore I definitely think it is a good idea for Flink 
> > community to accept this contribution and help maintaining it.
> >
> > Personally I'd prefer to have it in a separate repository. There were a few 
> > discussions before where different people were suggesting to extract 
> > connectors and other libraries to separate repositories. Moreover I think 
> > it could serve as 

Re: add() method of AggregateFunction not called even though new watermark is emitted

2019-10-14 Thread Dawid Wysakowicz
Hi Vijay,

Could you check if the Watermark for the aggregate operator advances?
You should be able to check that in the Flink WebUI. Could it be that
the Watermark does not advance for all of the upstream operators? The
watermark for a particular operator is a minimum of watermarks received
from all of the upstream operators. Therefore if some of them does not
produce any, the resulting watermark will not advance.

Best,

Dawdi

On 11/10/2019 21:37, Vijay Balakrishnan wrote:
> Hi,
> Here is my issue with *Event Processing* with the *add() method of
> MGroupingWindowAggregate not being called* even though a new watermark
> is fired
> 1. *Ingest data from Kinesis (works fine)*
> 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get
> json back)
> 3. I do *assign MonitoringTSWAssigner*(code below) to the source with
> bound of 10(have tried 3000, 3). *It fires a new WaterMark* with each
> incoming record but the *windowStream.aggregate method doesn't seem to
> fire* and I *don't see the add() method of MGroupingWindowAggregate
> called * I *can see the newWaterMark being emitted in
> TimestampsAndPunctuatedWatermarksOperator.processElement*
> 4. I have tried with timeWindow of 1m and 15s
>
> *Main* code:
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);
>
> //Setup Kinesis Consumer
> Properties kinesisConsumerConfig = new Properties();
> ..
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> ConsumerConfigConstants.InitialPosition.LATEST.name
> ());//LATEST
> FlinkKinesisConsumer> kinesisConsumer = new
> FlinkKinesisConsumer<>(
>                 "kinesisTopicRead", new
> MonitoringMapKinesisSchema(true), kinesisConsumerConfig);
>
> DataStream> kinesisStream;
> RichSinkFunction influxSink;
>
> DataStreamSource> monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
> kinesisStream = monitoringDataStreamSource
>         .assignTimestampsAndWatermarks(new
> *MonitoringTSWAssigner*(bound));
> influxSink = pms.createInfluxMonitoringSink();
> ..
> ...timeWindow = Time.seconds(*timeIntervalL*);//tried with
> timeIntervalL=15s, 1m 
>
> KeyedStream, MonitoringTuple>
> monitoringTupleKeyedStream =
>         kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
> final WindowedStream, MonitoringTuple, TimeWindow>
> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
> DataStream enrichedMGStream =
> *windowStream.aggregate*(//*<= never reaches here ?*
>         *new MGroupingWindowAggregate(interval)*,
>         new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
>         .map(new MonitoringGroupingToInfluxDBPoint(rule));
> enrichedMGStream.addSink(influxSink);
> env.execute("Aggregation of Map data");
>
> *MonitoringTSWAssigner* code:
> public class MonitoringTSWAssigner implements
> AssignerWithPunctuatedWatermarks> {
>     private long bound = 5 * (long) 1000;//5 secs out of order bound
> in millisecs
>     private long maxTimestamp = Long.MIN_VALUE;
>
>     public MonitoringTSWAssigner() {
>     }
>
>     public MonitoringTSWAssigner(long bound) {
>         this.bound = bound;
>     }
>
>     public long extractTimestamp(Map monitoring, long
> previousTS) {
>         long extractedTS = getExtractedTS(monitoring);
>         if (extractedTS > maxTimestamp) {
>             maxTimestamp = extractedTS;
>         }
> return extractedTS;//return System.currentTimeMillis();
>     }
>
>     public long getExtractedTS(Map monitoring) {
>         final String eventTimestamp =
> monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String)
> monitoring.get(Utils.EVENT_TIMESTAMP) : "";
>         return Utils.getLongFromDateStr(eventTimestamp);
>     }
>
>     @Override
>     public Watermark checkAndGetNextWatermark(Map
> monitoring, long extractedTimestamp) {
>         long extractedTS = getExtractedTS(monitoring);
>         long nextWatermark = maxTimestamp - bound;
>         return new Watermark(nextWatermark);
>     }
> }
>
> *MGroupingWindowAggregate*:
> public class MGroupingWindowAggregate implements
> *AggregateFunction*, Map,
> Map> {
>     private final String interval;
>     public MGroupingWindowAggregate(String interval) {
>         this.interval = interval;
>     }
>     public Map createAccumulator() {
>         return new ConcurrentHashMap<>();
>     }
>
>     public Map add(Map monitoring,
> Map timedMap) {
> .
> }
>
> .
>
> }
>
> TIA,
>
>        


signature.asc
Description: OpenPGP digital signature


Re: [PROPOSAL] Contribute Stateful Functions to Apache Flink

2019-10-14 Thread Dian Fu
Hi Stephan,

Big +1 for adding stateful functions to Apache Flink! The use cases unlocked 
with this feature are very interesting and promising.

Regarding to whether to place it into Flink core repository, personally I 
perfer to put it in the main repository. This feature introduces a new set of 
APIs and it will support a new set of applications. It enriches the API stack 
of Apache Flink. This is somewhat simlar to the Table API & SQL, State 
Processor API, CEP library, etc. If the applications supported by this feature 
are important enough for Flink, it's more appropriate to put it directly into 
the main repository.

Regards,
Dian

> 在 2019年10月13日,上午10:47,Hequn Cheng  写道:
> 
> Hi Stephan,
> 
> Big +1 for adding this to Apache Flink! 
> 
> As for the problem of whether this should be added to the Flink main 
> repository, from my side, I prefer to put it in the main repository. Not only 
> Stateful Functions shares very close relations with the current Flink, but 
> also other libs or modules in Flink can make use of it the other way round in 
> the future. At that time the Flink API stack would also be changed a bit and 
> this would be cool.
> 
> Best, Hequn
> 
> On Sat, Oct 12, 2019 at 9:16 PM Biao Liu  > wrote:
> Hi Stehpan,
> 
> +1 for having Stateful Functions in Flink.
> 
> Before discussing which repository it should belong, I was wondering if we 
> have reached an agreement of "splitting flink repository" as Piotr mentioned 
> or not. It seems that it's just no more further discussion. 
> It's OK for me to add it to core repository. After all almost everything is 
> in core repository now. But if we decide to split the core repository 
> someday, I tend to create a separate repository for Stateful Functions. It 
> might be good time to take the first step of splitting.
> 
> Thanks,
> Biao /'bɪ.aʊ/
> 
> 
> 
> On Sat, 12 Oct 2019 at 19:31, Yu Li  > wrote:
> Hi Stephan,
> 
> Big +1 for adding stateful functions to Flink. I believe a lot of user would 
> be interested to try this out and I could imagine how this could contribute 
> to reduce the TCO for business requiring both streaming processing and 
> stateful functions.
> 
> And my 2 cents is to put it into flink core repository since I could see a 
> tight connection between this library and flink state.
> 
> Best Regards,
> Yu
> 
> 
> On Sat, 12 Oct 2019 at 17:31, jincheng sun  > wrote:
> Hi Stephan,
> 
> bit +1 for adding this great features to Apache Flink.
> 
> Regarding where we should place it, put it into Flink core repository or 
> create a separate repository? I prefer put it into main repository and 
> looking forward the more detail discussion for this decision.
> 
> Best,
> Jincheng
> 
> 
> Jingsong Li mailto:jingsongl...@gmail.com>> 
> 于2019年10月12日周六 上午11:32写道:
> Hi Stephan,
> 
> big +1 for this contribution. It provides another user interface that is easy 
> to use and popular at this time. these functions, It's hard for users to 
> write in SQL/TableApi, while using DataStream is too complex. (We've done 
> some stateFun kind jobs using DataStream before). With statefun, it is very 
> easy.
> 
> I think it's also a good opportunity to exercise Flink's core capabilities. I 
> looked at stateful-functions-flink briefly, it is very interesting. I think 
> there are many other things Flink can improve. So I think it's a better thing 
> to put it into Flink, and the improvement for it will be more natural in the 
> future.
> 
> Best,
> Jingsong Lee
> 
> On Fri, Oct 11, 2019 at 7:33 PM Dawid Wysakowicz  > wrote:
> Hi Stephan,
> 
> I think this is a nice library, but what I like more about it is that it 
> suggests exploring different use-cases. I think it definitely makes sense for 
> the Flink community to explore more lightweight applications that reuses 
> resources. Therefore I definitely think it is a good idea for Flink community 
> to accept this contribution and help maintaining it.
> 
> Personally I'd prefer to have it in a separate repository. There were a few 
> discussions before where different people were suggesting to extract 
> connectors and other libraries to separate repositories. Moreover I think it 
> could serve as an example for the Flink ecosystem website[1]. This could be 
> the first project in there and give a good impression that the community sees 
> potential in the ecosystem website.
> Lastly, I'm wondering if this should go through PMC vote according to our 
> bylaws[2]. In the end the suggestion is to adopt an existing code base as is. 
> It also proposes a new programs concept that could result in a shift of 
> priorities for the community in a long run.
> Best,
> 
> Dawid
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>  
> 

FLINK WEEKLY 2019/41

2019-10-14 Thread Zili Chen
FLINK WEEKLY 2019/41 

很高兴和大家分享上周 FLINK 社区的发展。上周 Stephan Ewen 在 Flink Forward Berlin 上宣布了基于 FLINK
的通用计算库 Stateful Function ,使用 Stateful Function 可以将
FLINK 的应用场景扩展到现有的几乎所有数据系统上。具体的邮件链接参考本次 WEEKLY 最后的社区发展部分
用户问题

Flink 1.8 版本如何进行 TaskManager 的资源控制


升级 FLINK 1.5 到 1.8 并切换 runtime 框架到 FLIP-6 之后遇到的资源配置问题

文件重命名


一定程度上自定义 StreamingFileSink 产生的文件的名称

Flink SQL :Unknown or invalid SQL statement.


FLINK SQL Client 对 SQL 的支持局限性,不支持 create table 语句

How to write stream data to other Hadoop Cluster by StreamingFileSink


FLINK 作业将输出写到另一个 Hadoop 集群上所需要的配置,避免 FLINK 无法解析另一个集群的相关信息

基于savepoint 调小并发的问题


从 savepoint 中启动作业时修改原先配置的并发度,但是最大并发不能改变

flink checkpoint超时问题


排查 checkpoint 问题可以参考这篇文章 

Flink集群迁移savepoint还保留原集群地址问题讨论


目前 FLINK savepoint 保存的是文件的绝对路径,因此不支持移动到另一个 HDFS 集群上启动。作为临时方案,可以通过修改 meta
文件的非正规方法绕过

flink1.9 webui exception日志显示问题


FLINK 1.9 之后 Web UI 显示异常问题,可能与 1.9 对 failover 的策略更新有关,暂无定论

Flink StreamingFileSink.forBulkFormat to HDFS


支持使用 ORC 格式的 Hive 表消费 Kafka 数据到 HDFS

Group by multiple fields


基于多个 field 做 group by 的 API 调用方法

[SURVEY] How do people upgrade their Flink applications?


关于升级 FLINK 应用的调查,阿里的工程师简要介绍了他们的经验

Backpressure tuning/failure


关于 FLINK 反压的调优方法的问题
开发讨论

[DISCUSS] Drop Python 2 support for 1.10


Dian Fu 发起了在 1.10 中移除 FLINK 对 Python2 的支持的讨论,目前 FLINK 正在实现新的 Python
API。该讨论已基本达成一致,正在投票中

Mongo Connector


Vijay Srinivasaraghavan 发起了关于 FLINK Mongo 连接器的讨论

[DISCUSS] FLIP-76: Unaligned checkpoints


Arvid Heise 的 FLIP-76 得到了非常好的反响,该 FLIP 旨在优化反压情况下的 checkpoint 性能

[DISCUSS] FLIP-77: Introduce ConfigOptions with Data Types


Timo Walther 的 FLIP-77 由 FLIP-54 分裂而来,作为演化 FLINK 配置的一部分,首先支持 ConfigOptions
中带有数据类型信息

[SURVEY] How do you use ExternallyInducedSource or WithMasterCheckpointHook


Biao Liu 发起了对 FLINK 用户使用 ExternallyInducedSource 和 WithMasterCheckpointHook
接口的调查。这将对他主导的 CheckpointCoordinator 的线程模型重构有所帮助,并有助于保证重构工作不会影响现有的使用场景
社区发展

[PROPOSAL] Contribute Stateful Functions to Apache Flink


Stephan Ewen 在 Flink Forward Berlin 上宣布了基于 FLINK 的通用计算库 Stateful Function
,使用 Stateful Function 可以将 FLINK
的应用场景扩展到现有的几乎所有数据系统上。这个邮件旨在将 Stateful Function 贡献回 FLINK
的代码仓库中。目前主要对是否接受贡献和代码以独立仓库存在还是整合进 FLINK 主仓库进行讨论

[VOTE] Release 1.9.1, release candidate #1


FLINK 1.9.1 的发布稳步进行中,目前已经收到若干正面的反馈,对应的发布页也已经就绪。有望在本周发出