Re: DataStream with one DataSource and two different Sinks with diff. schema.

2018-11-09 Thread Hequn Cheng
Hi Marke,

You can use split() and select() as is shown here[1].

Best, Hequn
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations


On Sat, Nov 10, 2018 at 12:23 AM Marke Builder 
wrote:

> Hi,
>
> what is the recommended way to implement the following use-case for
> DataStream:
> One data sink, same map() functions for parsing and normalization and
> different map() function for format and two different sinks for the output?
>
> The (same)data must be stored in both sinks.
> And I prefere one job (related to the same source and map functions)
>
> How I can/should use the split() function for this use-case?
>
> Thanks!
>


Re: FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

2018-11-09 Thread shkob1
If it's running in parallel aren't you just adding readers which maxes out
your provisioned throughput? probably doesn't belong in here but rather a
Kinesis thing, but i suggest increasing your number of shards?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: java.io.IOException: NSS is already initialized

2018-11-09 Thread Ufuk Celebi
Hey Hao Sun,

- Is this an intermittent failure or permanent? The logs indicate that
some checkpoints completed before the error occurs (e.g. checkpoint
numbers are greater than 1).

- Which Java versions are you using? And which Java image? I've
Googled similar issues that seem to be related to the JVM, e.g. [1].

Best,

Ufuk

[1] 
https://dev.lucee.org/t/could-not-initialize-class-sun-security-ssl-sslcontextimp/3972

On Thu, Nov 8, 2018 at 8:55 PM Hao Sun  wrote:
>
> Thanks, any insight/help here is appreciated.
>
> On Thu, Nov 8, 2018 at 4:38 AM Dawid Wysakowicz  
> wrote:
>>
>> Hi Hao,
>>
>> I am not sure, what might be wrong, but I've cc'ed Gary and Kostas who were 
>> recently working with S3, maybe they will have some ideas.
>>
>> Best,
>>
>> Dawid
>>
>> On 03/11/2018 03:09, Hao Sun wrote:
>>
>> Same environment, new error.
>>
>> I can run the same docker image with my local Mac, but on K8S, this gives me 
>> this error.
>> I can not think of any difference between local Docker and K8S Docker.
>>
>> Any hint will be helpful. Thanks
>>
>> 
>>
>> 2018-11-02 23:29:32,981 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job 
>> ConnectedStreams maxwell.accounts () 
>> switched from state RUNNING to FAILING.
>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
>> 235 for operator Source: KafkaSource(maxwell.accounts) -> 
>> MaxwellFilter->Maxwell(maxwell.accounts) -> 
>> FixedDelayWatermark(maxwell.accounts) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink: 
>> influxdbSink(maxwell.accounts) (1/1).}
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 235 for 
>> operator Source: KafkaSource(maxwell.accounts) -> 
>> MaxwellFilter->Maxwell(maxwell.accounts) -> 
>> FixedDelayWatermark(maxwell.accounts) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink: 
>> influxdbSink(maxwell.accounts) (1/1).
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>> ... 6 more
>> Caused by: java.util.concurrent.ExecutionException: 
>> java.lang.NoClassDefFoundError: Could not initialize class 
>> sun.security.ssl.SSLSessionImpl
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>>
>> at 
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(http://OperatorSnapshotFinalizer.java:53)
>>
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>> ... 5 more
>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
>> sun.security.ssl.SSLSessionImpl
>> at sun.security.ssl.SSLSocketImpl.init(SSLSocketImpl.java:604)
>>
>> at sun.security.ssl.SSLSocketImpl.(http://SSLSocketImpl.java:572)
>>
>> at 
>> sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:110)
>> at 
>> org.apache.flink.fs.s3presto.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:365)
>> at 
>> org.apache.flink.fs.s3presto.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355)
>> at 
>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:132)
>> at 
>> org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>> at 
>> org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at 
>> 

Re: Manually clean SQL keyed state

2018-11-09 Thread shkob1
Thanks Fabian!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: AvroInputFormat Serialisation Issue

2018-11-09 Thread Vinay Patil
Hi,

Changing the classloader config to parent-first solved the issue.

Regards,
Vinay Patil


On Wed, Nov 7, 2018 at 7:25 AM Vinay Patil  wrote:

> Hi,
>
> Can someone please help here.
>
> On Nov 6, 2018 10:46 PM, "Vinay Patil [via Apache Flink User Mailing List
> archive.]"  wrote:
>
>> Hi,
>>
>> I am facing a similar issue today with Flink 1.6.0 - AvroOutputFormat
>>
>> AvroOutputFormat tuple2AvroOutputFormat = new
>> AvroOutputFormat<>(
>> new Path(""), GenericRecord.class);
>>
>> testDataSet
>> .map(new GenerateGenericRecord())
>> .returns(AvroTypeInfo.of(GenericRecord.class))
>> .output(tuple2AvroOutputFormat);
>>
>> Following is the exception (I have enabled forceAvro config , not sure
>> why
>> it still goes to Kyro Serializer)
>>
>> com.esotericsoftware.kryo.KryoException: Error constructing instance of
>> class: org.apache.avro.Schema$LockableArrayList
>> Serialization trace:
>> types (org.apache.avro.Schema$UnionSchema)
>> schema (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>> at
>> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.create(CollectionSerializer.java:89)
>>
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:93)
>>
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>
>> Please let me know if there is a fix for this issue as I have not faced
>> this
>> problem for DataStreams.
>>
>> Regards,
>> Vinay Patil
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/AvroInputFormat-Serialisation-Issue-tp20146p24314.html
>> To start a new topic under Apache Flink User Mailing List archive., email
>> ml+s2336050n1...@n4.nabble.com
>> To unsubscribe from Apache Flink User Mailing List archive., click here
>> 
>> .
>> NAML
>> 
>>
>


Re: Flink Web UI does not show specific exception messages when job submission fails

2018-11-09 Thread Cliff Resnick
+1!

On Fri, Nov 9, 2018 at 1:34 PM Gary Yao  wrote:

> Hi,
>
> We only propagate the exception message but not the complete stacktrace
> [1].
> Can you create a ticket for that?
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java#L93
>
> On Tue, Nov 6, 2018 at 6:50 PM Luis Gustavo Oliveira Silva <
> l...@poli.ufrj.br> wrote:
>
>> Hello,
>>
>> I was using Flink 1.4.2 and when submiting jobs through the Web UI, I
>> could see exceptions that would help me debug jobs, such as:
>>
>> We're sorry, something went wrong. The server responded with:
>>>
>>> java.util.concurrent.CompletionException: 
>>> org.apache.flink.util.FlinkException: Could not run the jar.
>>> at 
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
>>> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
>>> Source)
>>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>>> at java.util.concurrent.FutureTask.run(Unknown Source)
>>> at 
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown
>>>  Source)
>>> at 
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>>>  Source)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>> at java.lang.Thread.run(Unknown Source)
>>> Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
>>> ... 9 more
>>> Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
>>> main method caused an error.
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>>> at 
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>> at 
>>> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
>>> at 
>>> org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
>>> at 
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
>>> ... 8 more
>>> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>>> Encountered "." at line 3, column 4.
>>> Was expecting one of:
>>> 
>>> "ORDER" ...
>>> "LIMIT" ...
>>> "OFFSET" ...
>>> "FETCH" ...
>>> "FROM" ...
>>> "," ...
>>> "UNION" ...
>>> "INTERSECT" ...
>>> "EXCEPT" ...
>>> "MINUS" ...
>>>
>>> at 
>>> org.apache.flink.table.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:81)
>>> at 
>>> org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:558)
>>> at 
>>> com.stone.default.rule.Sandbox3$.delayedEndpoint$com$stone$default$rule$Sandbox3$1(Sandbox.scala:112)
>>> at 
>>> com.stone.default.rule.Sandbox3$delayedInit$body.apply(Sandbox.scala:93)
>>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>>> at 
>>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>> at scala.collection.immutable.List.foreach(List.scala:392)
>>> at 
>>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>>> at scala.App$class.main(App.scala:76)
>>> at com.stone.default.rule.Sandbox3$.main(Sandbox.scala:93)
>>> at com.stone.default.rule.Sandbox3.main(Sandbox.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>> at java.lang.reflect.Method.invoke(Unknown Source)
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>>> ... 13 more
>>> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." 
>>> at line 3, column 4.
>>> Was expecting one of:
>>> 
>>> "ORDER" ...
>>> "LIMIT" ...
>>> "OFFSET" ...
>>> "FETCH" ...
>>> "FROM" ...
>>> "," ...
>>> "UNION" ...
>>> "INTERSECT" ...
>>> "EXCEPT" ...
>>> "MINUS" ...
>>>
>>> at 
>>> org.apache.calcite.sql.parser.impl.SqlParserImpl.convertException(SqlParserImpl.java:350)
>>> at 
>>> org.apache.calcite.sql.parser.impl.SqlParserImpl.normalizeException(SqlParserImpl.java:131)
>>> at 
>>> org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:138)
>>> at 

Re: Flink Web UI does not show specific exception messages when job submission fails

2018-11-09 Thread Gary Yao
Hi,

We only propagate the exception message but not the complete stacktrace [1].
Can you create a ticket for that?

Best,
Gary

[1]
https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java#L93

On Tue, Nov 6, 2018 at 6:50 PM Luis Gustavo Oliveira Silva <
l...@poli.ufrj.br> wrote:

> Hello,
>
> I was using Flink 1.4.2 and when submiting jobs through the Web UI, I
> could see exceptions that would help me debug jobs, such as:
>
> We're sorry, something went wrong. The server responded with:
>>
>> java.util.concurrent.CompletionException: 
>> org.apache.flink.util.FlinkException: Could not run the jar.
>>  at 
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
>>  at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
>> Source)
>>  at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>>  at java.util.concurrent.FutureTask.run(Unknown Source)
>>  at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown
>>  Source)
>>  at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>>  Source)
>>  at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>  at java.lang.Thread.run(Unknown Source)
>> Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
>>  ... 9 more
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
>> main method caused an error.
>>  at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>>  at 
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>  at 
>> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
>>  at 
>> org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
>>  at 
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
>>  ... 8 more
>> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>> Encountered "." at line 3, column 4.
>> Was expecting one of:
>> 
>> "ORDER" ...
>> "LIMIT" ...
>> "OFFSET" ...
>> "FETCH" ...
>> "FROM" ...
>> "," ...
>> "UNION" ...
>> "INTERSECT" ...
>> "EXCEPT" ...
>> "MINUS" ...
>>
>>  at 
>> org.apache.flink.table.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:81)
>>  at 
>> org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:558)
>>  at 
>> com.stone.default.rule.Sandbox3$.delayedEndpoint$com$stone$default$rule$Sandbox3$1(Sandbox.scala:112)
>>  at 
>> com.stone.default.rule.Sandbox3$delayedInit$body.apply(Sandbox.scala:93)
>>  at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>>  at 
>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>  at scala.App$$anonfun$main$1.apply(App.scala:76)
>>  at scala.App$$anonfun$main$1.apply(App.scala:76)
>>  at scala.collection.immutable.List.foreach(List.scala:392)
>>  at 
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>>  at scala.App$class.main(App.scala:76)
>>  at com.stone.default.rule.Sandbox3$.main(Sandbox.scala:93)
>>  at com.stone.default.rule.Sandbox3.main(Sandbox.scala)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>>  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>  at java.lang.reflect.Method.invoke(Unknown Source)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>>  ... 13 more
>> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." 
>> at line 3, column 4.
>> Was expecting one of:
>> 
>> "ORDER" ...
>> "LIMIT" ...
>> "OFFSET" ...
>> "FETCH" ...
>> "FROM" ...
>> "," ...
>> "UNION" ...
>> "INTERSECT" ...
>> "EXCEPT" ...
>> "MINUS" ...
>>
>>  at 
>> org.apache.calcite.sql.parser.impl.SqlParserImpl.convertException(SqlParserImpl.java:350)
>>  at 
>> org.apache.calcite.sql.parser.impl.SqlParserImpl.normalizeException(SqlParserImpl.java:131)
>>  at 
>> org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:138)
>>  at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:163)
>>  at 
>> org.apache.flink.table.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:77)
>>   

Re: Never gets into ProcessWindowFunction.process()

2018-11-09 Thread Vijay Balakrishnan
Hi Gary,
Bang on the money. I did not have an assigned Watermark and once I put that
in, the code entered the process() method.
Thx a ton for your help.Life-saver

DataStream kinesisStream = env
.addSource(kinesisConsumer)
.assignTimestampsAndWatermarks(new MonitoringAssigner())//<=



On Fri, Nov 9, 2018 at 10:02 AM Gary Yao  wrote:

> Hi,
>
> You are using event time but are you assigning watermarks [1]? I do not
> see it
> in the code.
>
> Best,
> Gary
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records
>
> On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> Any help is appreciated.Dug into this. *I can see the deserialized
>> output log from FlinkKinesisConsumer deserialization but it keeps looping
>> to pull from Kinesis Stream but never gets into the Windowing operation for
>> process() or apply().*
>>
>> FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream
>> and the deserialized output never seems to get into the apply() or
>> process() method of a Windowing operation. I can see the logs of
>> MonitoringMapKinesisSchema deserializing data back successfully from
>> Kinesis and converting into a POJO.
>>
>> Code:
>>
>> *//Create environment*:
>> StreamExecutionEnvironment env;
>> if (local) {
>> Configuration configuration = new Configuration();
>> configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
>> env = StreamExecutionEnvironment.createLocalEnvironment(1,
>> configuration);
>> } else {
>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>> }
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> *//create FlinkKinesisConsumer*
>> Properties kinesisConsumerConfig = new Properties();
>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
>> "AUTO");
>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
>> "1");
>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
>> "2000");
>> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
>> "TRIM_HORIZON");
>> FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>(
>> kinesisTopicRead, new MonitoringMapKinesisSchema(),
>> kinesisConsumerConfig);*//deserialization works fine*
>> DataStream kinesisStream = env
>> .addSource(kinesisConsumer);
>> KeyedStream>
>> enrichedComponentInstanceStream1Key = kinesisStream
>> .keyBy(new KeySelector> String>>() {
>> public Tuple3
>> getKey(Monitoring mon) throws Exception {
>> return new Tuple3> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
>> }
>> });
>>
>> WindowedStream, TimeWindow>
>> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key
>>
>> .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));
>>
>> DataStream enrichedComponentInstanceStream1 =
>> enrichedComponentInstanceStream1Win
>> //.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
>> COMPONENT_INSTANCE_OPERATION))
>> .process(new Window5SecProcessing());*//never gets in
>> here*
>> //Gets into Window5SecProcessing.open() method during initialization but
>> never into the process method 
>> private static class Window5SecProcessing extends
>> ProcessWindowFunction> String, String>, TimeWindow> {
>>
>> private transient String interval;
>> private transient String gameId;
>> private transient String keyType;
>> private transient org.apache.flink.metrics.Histogram
>> fiveSecHistogram;
>>
>> private transient ValueState total5SecCountState;
>> private transient ValueStateDescriptor
>> total5SecCountValueStateDescriptor;
>> public Window5SecProcessing() {
>>
>> }
>>
>> public Window5SecProcessing(String gameId, String interval,
>> String keyType) {
>> this.gameId = gameId;
>> this.interval = interval;
>> this.keyType = keyType;
>> }
>>
>> @Override
>> public void clear(Context context) throws Exception {
>> super.clear(context);
>> KeyedStateStore keyedStateStore = context.windowState();
>>
>> keyedStateStore.getState(total5SecCountValueStateDescriptor).clear();
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> super.open(parameters);
>> logger.debug("Gets in here fine -Window5SecProcessing
>> -Entered open - parameters:{}", parameters);
>> com.codahale.metrics.Histogram fiveSecHist =
>> new com.codahale.metrics.Histogram(new
>> SlidingTimeWindowReservoir(5, 

Re: Per job cluster doesn't shut down after the job is canceled

2018-11-09 Thread Gary Yao
Hi Paul,

Can you share the complete logs, or at least the logs after invoking the
cancel command?

If you want to debug it yourself, check if
MiniDispatcher#jobReachedGloballyTerminalState [1] is invoked, and see how
the
jobTerminationFuture is used.

Best,
Gary

[1]
https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java#L141


On Wed, Nov 7, 2018 at 3:27 AM Paul Lam  wrote:

> Hi,
>
> I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN
> cluster doesn’t shut down after the job is canceled successfully. The only
> errors I found in jobmanager’s log are as below (the second one appears
> multiple times):
>
> ```
>
> 2018-11-07 09:48:38,663 WARN  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Error while 
> notifying JobStatusListener
> java.lang.IllegalStateException: Incremented the completed number of 
> checkpoints without incrementing the in progress checkpoints before.
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
>   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
>   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
>   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> 2018-11-07 09:54:52,420 ERROR 
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - 
> Implementation error: Unhandled exception.
> java.lang.IllegalArgumentException: Negative number of in progress checkpoints
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.(CheckpointStatsCounts.java:72)
>   at 
> 

Re: Never gets into ProcessWindowFunction.process()

2018-11-09 Thread Gary Yao
Hi,

You are using event time but are you assigning watermarks [1]? I do not see
it
in the code.

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records

On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan 
wrote:

> Hi,
> Any help is appreciated.Dug into this. *I can see the deserialized output
> log from FlinkKinesisConsumer deserialization but it keeps looping to pull
> from Kinesis Stream but never gets into the Windowing operation for
> process() or apply().*
>
> FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream
> and the deserialized output never seems to get into the apply() or
> process() method of a Windowing operation. I can see the logs of
> MonitoringMapKinesisSchema deserializing data back successfully from
> Kinesis and converting into a POJO.
>
> Code:
>
> *//Create environment*:
> StreamExecutionEnvironment env;
> if (local) {
> Configuration configuration = new Configuration();
> configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
> env = StreamExecutionEnvironment.createLocalEnvironment(1,
> configuration);
> } else {
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> }
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> *//create FlinkKinesisConsumer*
> Properties kinesisConsumerConfig = new Properties();
> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
> "AUTO");
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
> "1");
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
> "2000");
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> "TRIM_HORIZON");
> FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>(
> kinesisTopicRead, new MonitoringMapKinesisSchema(),
> kinesisConsumerConfig);*//deserialization works fine*
> DataStream kinesisStream = env
> .addSource(kinesisConsumer);
> KeyedStream>
> enrichedComponentInstanceStream1Key = kinesisStream
> .keyBy(new KeySelector String>>() {
> public Tuple3
> getKey(Monitoring mon) throws Exception {
> return new Tuple3 String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
> }
> });
>
> WindowedStream, TimeWindow>
> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key
>
> .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));
>
> DataStream enrichedComponentInstanceStream1 =
> enrichedComponentInstanceStream1Win
> //.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
> COMPONENT_INSTANCE_OPERATION))
> .process(new Window5SecProcessing());*//never gets in
> here*
> //Gets into Window5SecProcessing.open() method during initialization but
> never into the process method 
> private static class Window5SecProcessing extends
> ProcessWindowFunction String, String>, TimeWindow> {
>
> private transient String interval;
> private transient String gameId;
> private transient String keyType;
> private transient org.apache.flink.metrics.Histogram
> fiveSecHistogram;
>
> private transient ValueState total5SecCountState;
> private transient ValueStateDescriptor
> total5SecCountValueStateDescriptor;
> public Window5SecProcessing() {
>
> }
>
> public Window5SecProcessing(String gameId, String interval, String
> keyType) {
> this.gameId = gameId;
> this.interval = interval;
> this.keyType = keyType;
> }
>
> @Override
> public void clear(Context context) throws Exception {
> super.clear(context);
> KeyedStateStore keyedStateStore = context.windowState();
>
> keyedStateStore.getState(total5SecCountValueStateDescriptor).clear();
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> logger.debug("Gets in here fine -Window5SecProcessing -Entered
> open - parameters:{}", parameters);
> com.codahale.metrics.Histogram fiveSecHist =
> new com.codahale.metrics.Histogram(new
> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
> this.fiveSecHistogram = new
> DropwizardHistogramWrapper(fiveSecHist);
> total5SecCountValueStateDescriptor =
> new ValueStateDescriptor("total5SecCount",
> Long.class, 0L);
> total5SecCountState =
> getRuntimeContext().getState(total5SecCountValueStateDescriptor);
> }
>
>
> public void process(Tuple3 currentKey1,
> Context ctx, Iterable input, Collector out)
> throws Exception {
> 

Re: Never gets into ProcessWindowFunction.process()

2018-11-09 Thread Vijay Balakrishnan
Hi,
Any help is appreciated.Dug into this. *I can see the deserialized output
log from FlinkKinesisConsumer deserialization but it keeps looping to pull
from Kinesis Stream but never gets into the Windowing operation for
process() or apply().*

FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream
and the deserialized output never seems to get into the apply() or
process() method of a Windowing operation. I can see the logs of
MonitoringMapKinesisSchema deserializing data back successfully from
Kinesis and converting into a POJO.

Code:

*//Create environment*:
StreamExecutionEnvironment env;
if (local) {
Configuration configuration = new Configuration();
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
env = StreamExecutionEnvironment.createLocalEnvironment(1,
configuration);
} else {
env = StreamExecutionEnvironment.getExecutionEnvironment();
}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
*//create FlinkKinesisConsumer*
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
"AUTO");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
"1");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
"2000");
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON");
FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>(
kinesisTopicRead, new MonitoringMapKinesisSchema(),
kinesisConsumerConfig);*//deserialization works fine*
DataStream kinesisStream = env
.addSource(kinesisConsumer);
KeyedStream>
enrichedComponentInstanceStream1Key = kinesisStream
.keyBy(new KeySelector>() {
public Tuple3 getKey(Monitoring
mon) throws Exception {
return new Tuple3(mon.getComponent(), mon.getInstance(), mon.getOperation());
}
});

WindowedStream, TimeWindow>
enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key

.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

DataStream enrichedComponentInstanceStream1 =
enrichedComponentInstanceStream1Win
//.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
COMPONENT_INSTANCE_OPERATION))
.process(new Window5SecProcessing());*//never gets in here*
//Gets into Window5SecProcessing.open() method during initialization but
never into the process method 
private static class Window5SecProcessing extends
ProcessWindowFunction, TimeWindow> {

private transient String interval;
private transient String gameId;
private transient String keyType;
private transient org.apache.flink.metrics.Histogram
fiveSecHistogram;

private transient ValueState total5SecCountState;
private transient ValueStateDescriptor
total5SecCountValueStateDescriptor;
public Window5SecProcessing() {

}

public Window5SecProcessing(String gameId, String interval, String
keyType) {
this.gameId = gameId;
this.interval = interval;
this.keyType = keyType;
}

@Override
public void clear(Context context) throws Exception {
super.clear(context);
KeyedStateStore keyedStateStore = context.windowState();

keyedStateStore.getState(total5SecCountValueStateDescriptor).clear();
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
logger.debug("Gets in here fine -Window5SecProcessing -Entered
open - parameters:{}", parameters);
com.codahale.metrics.Histogram fiveSecHist =
new com.codahale.metrics.Histogram(new
SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
this.fiveSecHistogram = new
DropwizardHistogramWrapper(fiveSecHist);
total5SecCountValueStateDescriptor =
new ValueStateDescriptor("total5SecCount",
Long.class, 0L);
total5SecCountState =
getRuntimeContext().getState(total5SecCountValueStateDescriptor);
}


public void process(Tuple3 currentKey1,
Context ctx, Iterable input, Collector out)
throws Exception {
logger.debug("@@never gets here@@Window5SecProcessing - Entered
process ");//
...
}




On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan 
wrote:

> Hi,
> Running in IntelliJ IDE on a Mac with 4 vProcessors.
> Code compiles fine. It never gets into the Window5SecProcessing's
> process().I am able to get data from the Kinesis Consumer and it is
> deserialized properly when I debug the code. It gets into the
> Window5SecProcessing.open() method for initialization.
>
> Not sure if I am failing with no slots available ???
> In main():
> 

Re: Never gets into ProcessWindowFunction.process()

2018-11-09 Thread Vijay Balakrishnan
Hi Gary,
Just posted the code.Pls let me know if that clarifies the problem. Have
been digging into how the FlinkKinesisConsumer deserialized output gets
passed into the process() or apply() method to no avail. The coding pattern
I used matches all the fink-examples I have seen for Flink 1.6.1
TIA,
Vijay

On Fri, Nov 9, 2018 at 9:53 AM Gary Yao  wrote:

> Hi,
>
> If the job is actually running and consuming from Kinesis, the log you
> posted
> is unrelated to your problem. To understand why the process function is not
> invoked, we would need to see more of your code, or you would need to
> provide
> an executable example. The log only shows that all offered slots are
> occupied
> by tasks of your job.
>
> Best,
> Gary
>
> On Tue, Nov 6, 2018 at 1:10 AM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> Running in IntelliJ IDE on a Mac with 4 vProcessors.
>> Code compiles fine. It never gets into the Window5SecProcessing's
>> process().I am able to get data from the Kinesis Consumer and it is
>> deserialized properly when I debug the code. It gets into the
>> Window5SecProcessing.open() method for initialization.
>>
>> Not sure if I am failing with no slots available ???
>> In main():
>>  //trimmed a lot of code
>> *FlinkKinesisConsumer kinesisConsumer =
>> getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ...,
>> ...);*
>>
>> *DataStream kinesisStream = env*
>> *.addSource(kinesisConsumer)*
>> *.uid(jobName + "KinesisSource");*
>> *KeyedStream>
>> enrichedComponentInstanceStream1Key = kinesisStream*
>> *.keyBy(new KeySelector> String, String>>() {*
>> *public Tuple3
>> getKey(Monitoring mon) throws Exception {*
>> *return new Tuple3> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());*
>> *}});*
>>
>> *WindowedStream,
>> TimeWindow> enrichedComponentInstanceStream1Win =
>> enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));*
>>
>> *DataStream enrichedComponentInstanceStream1
>> = enrichedComponentInstanceStream1Win*
>> *.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
>> COMPONENT_INSTANCE_OPERATION))*
>> *.uid("Component Instance Operation Key Monitoring " +
>> FIVE_SECONDS);*
>> *enrichedComponentInstanceStream1.addSink(new
>> SinkFunction() {*
>> *@Override*
>> *public void invoke(MonitoringGrouping mg, Context context)
>> throws Exception {*
>> *//TODO call ES*
>> *logger.debug("In enrichedComponentInstanceStream1 Sink
>> received mg:{}", mg);*
>> *}*
>> *});*
>> *Window processing class*:
>> private static class Window5SecProcessing extends
>> ProcessWindowFunction> String, String>, TimeWindow> {
>> private transient Histogram fiveSecHist;
>> private transient Histogram fiveMinHist;
>> private transient org.apache.flink.metrics.Histogram
>> fiveSecHistogram;
>> private transient org.apache.flink.metrics.Histogram
>> fiveMinHistogram;
>> private transient ValueState total5SecCountState;
>> private transient ValueStateDescriptor
>> total5SecCountValueStateDescriptor;
>>
>> public Window5SecProcessing(String gameId, String interval,
>> String keyType) {
>> ...
>> }
>>
>> public void open(Configuration parameters) throws Exception {
>> super.open(parameters);
>> logger.debug("Window5SecProcessing -Entered open -
>> parameters:{}", parameters);//gets here
>> com.codahale.metrics.Histogram fiveSecHist =
>> new com.codahale.metrics.Histogram(new
>> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
>> this.fiveSecHistogram = new
>> DropwizardHistogramWrapper(fiveSecHist);
>> total5SecCountValueStateDescriptor =
>> new ValueStateDescriptor("total5SecCount",
>> Long.class, 0L);
>> total5SecCountState =
>> getRuntimeContext().getState(total5SecCountValueStateDescriptor);
>> }
>> ..
>>
>>* public void process(Tuple3 currentKey1,
>> Context ctx, Iterable input, Collector out)
>> throws Exception {*
>> *logger.debug("Window5SecProcessing - Entered process
>> ");//never gets here*
>> *Tuple3 currentKey = (Tuple3> String, String>) currentKey1;*
>> **
>> *}*
>>
>> }
>> At 1 point in the logs, I seem to see that there are no slots available
>> ? Is that the problem- how can I fix that if that is the case to test
>> locally on my Mac ??
>> *Log:*
>> flink-akka.actor.default-dispatcher-71 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Slot Pool
>> Status:
>> status: connected to
>> akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
>> registered TaskManagers: 

Re: Never gets into ProcessWindowFunction.process()

2018-11-09 Thread Gary Yao
Hi,

If the job is actually running and consuming from Kinesis, the log you
posted
is unrelated to your problem. To understand why the process function is not
invoked, we would need to see more of your code, or you would need to
provide
an executable example. The log only shows that all offered slots are
occupied
by tasks of your job.

Best,
Gary

On Tue, Nov 6, 2018 at 1:10 AM Vijay Balakrishnan 
wrote:

> Hi,
> Running in IntelliJ IDE on a Mac with 4 vProcessors.
> Code compiles fine. It never gets into the Window5SecProcessing's
> process().I am able to get data from the Kinesis Consumer and it is
> deserialized properly when I debug the code. It gets into the
> Window5SecProcessing.open() method for initialization.
>
> Not sure if I am failing with no slots available ???
> In main():
>  //trimmed a lot of code
> *FlinkKinesisConsumer kinesisConsumer =
> getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ...,
> ...);*
>
> *DataStream kinesisStream = env*
> *.addSource(kinesisConsumer)*
> *.uid(jobName + "KinesisSource");*
> *KeyedStream>
> enrichedComponentInstanceStream1Key = kinesisStream*
> *.keyBy(new KeySelector String>>() {*
> *public Tuple3
> getKey(Monitoring mon) throws Exception {*
> *return new Tuple3 String>(mon.getComponent(), mon.getInstance(), mon.getOperation());*
> *}});*
>
> *WindowedStream,
> TimeWindow> enrichedComponentInstanceStream1Win =
> enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));*
>
> *DataStream enrichedComponentInstanceStream1 =
> enrichedComponentInstanceStream1Win*
> *.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
> COMPONENT_INSTANCE_OPERATION))*
> *.uid("Component Instance Operation Key Monitoring " +
> FIVE_SECONDS);*
> *enrichedComponentInstanceStream1.addSink(new
> SinkFunction() {*
> *@Override*
> *public void invoke(MonitoringGrouping mg, Context context)
> throws Exception {*
> *//TODO call ES*
> *logger.debug("In enrichedComponentInstanceStream1 Sink
> received mg:{}", mg);*
> *}*
> *});*
> *Window processing class*:
> private static class Window5SecProcessing extends
> ProcessWindowFunction String, String>, TimeWindow> {
> private transient Histogram fiveSecHist;
> private transient Histogram fiveMinHist;
> private transient org.apache.flink.metrics.Histogram
> fiveSecHistogram;
> private transient org.apache.flink.metrics.Histogram
> fiveMinHistogram;
> private transient ValueState total5SecCountState;
> private transient ValueStateDescriptor
> total5SecCountValueStateDescriptor;
>
> public Window5SecProcessing(String gameId, String interval, String
> keyType) {
> ...
> }
>
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> logger.debug("Window5SecProcessing -Entered open -
> parameters:{}", parameters);//gets here
> com.codahale.metrics.Histogram fiveSecHist =
> new com.codahale.metrics.Histogram(new
> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
> this.fiveSecHistogram = new
> DropwizardHistogramWrapper(fiveSecHist);
> total5SecCountValueStateDescriptor =
> new ValueStateDescriptor("total5SecCount",
> Long.class, 0L);
> total5SecCountState =
> getRuntimeContext().getState(total5SecCountValueStateDescriptor);
> }
> ..
>
>* public void process(Tuple3 currentKey1,
> Context ctx, Iterable input, Collector out)
> throws Exception {*
> *logger.debug("Window5SecProcessing - Entered process
> ");//never gets here*
> *Tuple3 currentKey = (Tuple3 String, String>) currentKey1;*
> **
> *}*
>
> }
> At 1 point in the logs, I seem to see that there are no slots available
> ? Is that the problem- how can I fix that if that is the case to test
> locally on my Mac ??
> *Log:*
> flink-akka.actor.default-dispatcher-71 DEBUG
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Slot Pool
> Status:
> status: connected to
> akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
> registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
> *available slots: []*
> allocated slots: [[AllocatedSlot
> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
> pending requests: []
> sharing groups: {
>  5a0ae59368145d715b3cc0d39ba6c05a 
> {
> groupId=5a0ae59368145d715b3cc0d39ba6c05a
> unresolved={}
> resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost
> 

DataStream with one DataSource and two different Sinks with diff. schema.

2018-11-09 Thread Marke Builder
Hi,

what is the recommended way to implement the following use-case for
DataStream:
One data sink, same map() functions for parsing and normalization and
different map() function for format and two different sinks for the output?

The (same)data must be stored in both sinks.
And I prefere one job (related to the same source and map functions)

How I can/should use the split() function for this use-case?

Thanks!


Re: Where is the "Latest Savepoint" information saved?

2018-11-09 Thread Hao Sun
Can we add an option to allow job cluster mode to start from the latest
save point? Otherwise I have to somehow get the info from ZK, before job
cluster's container started by K8s.

On Fri, Nov 9, 2018, 01:29 Paul Lam  wrote:

> Hi Hao,
>
> The savepoint path is stored in ZK, but it’s in binary format, so in order
> to retrieve the path you have to deserialize it back to some Flink internal
> object.
>
> A better approach would be using REST api to get the path. You could find
> it here[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
> 
>
> Best,
> Paul Lam
>
>
> 在 2018年11月9日,13:55,Hao Sun  写道:
>
> Since this save point path is very useful to application updates, where is
> this information stored? Can we keep it in ZK or S3 for retrieval?
>
> 
>
>
>


Flink Question

2018-11-09 Thread Steve Bistline
I am having problems with the Flink Kinesis adapter. I have some native KCL
code that works fine. I want to replace the .addSource with the CSV String
data that is coming in from my KCL code. How can I do that?


// Consume the data streams from AWS Kinesis stream
DataStream dataStream = env.addSource(new FlinkKinesisConsumer<>(
pt.getRequired("stream"),
new EventSchema(),
kinesisConsumerConfig))
.name("Kinesis Stream Consumer");

Any help would be appreciated

Thanks,

Steve


FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

2018-11-09 Thread Steve Bistline
I am getting this error from the Flink Kinesis Connector. I have a native
KCL app running in parallel with no problems.


Any help would be appreciated


Thanks so much!!


Steve


flink-sbistl919-taskexecutor-0-CACSVML-15736.log:2018-11-09 07:46:11,579
WARN  org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy  -
Got recoverable SdkClientException. Backing off for 258 millis (Rate
exceeded for shard shardId- in stream CSV under account
x  . (Service: AmazonKinesis; Status Code: 400; Error Code:
ProvisionedThroughputExceededException; Request ID:
e1c0caa4-8c4c-7738-b59f-4977bc762cf3))

flink-sbistl919-taskexecutor-0-CACSVML-15736.log:2018-11-09 07:46:16,844
WARN  org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy  -
Got recoverable SdkClientException. Backing off for 203 millis (Rate
exceeded for shard shardId-0001 in stream CSV under account
x. (Service: AmazonKinesis; Status Code: 400; Error Code:
ProvisionedThroughputExceededException; Request ID:
f7d22c26-96f6-c547-a38d-affe493cd2e1))


Re: flink-1.6.1 :: job deployment :: detached mode

2018-11-09 Thread Flavio Pompermaier
To verify it just add something after the env.execute() in the wordCount,
like a log or a system out or try to run the program twice and submit it
using the rest API or Web UI.
You'll see that the program will run until the first call to execute, then
nothing else will happen


Re: flink-1.6.1 :: job deployment :: detached mode

2018-11-09 Thread Till Rohrmann
Hi Mike,

the job seems to run. It might indeed only be a problem with shutting down
the ZooKeeper utils on the client side after the job has been submitted. I
will try to reproduce it locally. Keep us posted on the state of
CURATOR-466 if something should change.

Cheers,
Till

On Thu, Nov 8, 2018 at 11:17 PM Mikhail Pryakhin 
wrote:

> Hi Till.
> Of course, please find the job bootstrap and YarnJobClusterEntrypoint logs
> attached.
>
> The stacktrace below resembles the bug in Apache Curator
> https://issues.apache.org/jira/browse/CURATOR-466.
>
> java.lang.IllegalStateException: instance must be started before calling
> this method
> at
> org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.getData(CuratorFrameworkImpl.java:375)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.processBackgroundResult(NodeCache.java:288)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.access$300(NodeCache.java:56)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache$3.processResult(NodeCache.java:122)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:749)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:522)
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.ExistsBuilderImpl$1.processResult(ExistsBuilderImpl.java:137)
> at
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:554)
> at
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:505)
>
>
> Kind Regards,
> Mike Pryakhin
>
> On 8 Nov 2018, at 12:12, Till Rohrmann  wrote:
>
> Hi Mike,
>
> could you also send me the YarnJobClusterEntrypoint logs. Thanks!
>
> Cheers,
> Till
>
> On Wed, Nov 7, 2018 at 9:27 PM Mikhail Pryakhin 
> wrote:
>
>> Hi Till,
>> Thank you for your reply.
>> Yes, I’ve upgraded to the latest Flink-1.6.2 and the problem is still
>> there, please find the log file attached.
>>
>>
>> Kind Regards,
>> Mike Pryakhin
>>
>> On 7 Nov 2018, at 18:46, Till Rohrmann  wrote:
>>
>> Hi Mike,
>>
>> have you tried whether the problem also occurs with Flink 1.6.2? If yes,
>> then please share with us the Flink logs with DEBUG log level to further
>> debug the problem.
>>
>> Cheers,
>> Till
>>
>> On Fri, Oct 26, 2018 at 5:46 PM Mikhail Pryakhin 
>> wrote:
>>
>>> Hi community!
>>>
>>> Righ after I've upgraded flink up to flink-1.6.1 I get an exception
>>> during job deployment as a YARN cluster.
>>> The job is submitted with zookeper HA enabled, in detached mode.
>>>
>>> The flink yaml contains the following properties:
>>>
>>> high-availability: zookeeper
>>> high-availability.zookeeper.quorum: 
>>> high-availability.zookeeper.storageDir: hdfs:///
>>> high-availability.zookeeper.path.root: 
>>> high-availability.zookeeper.path.namespace: 
>>>
>>> the job is deployed via flink CLI command like the following:
>>>
>>> "${FLINK_HOME}/bin/flink" run \
>>> -m yarn-cluster \
>>> -ynm "${JOB_NAME}-${JOB_VERSION}" \
>>> -yn "${tm_containers}" \
>>> -ys "${tm_slots}" \
>>> -ytm "${tm_memory}" \
>>> -yjm "${jm_memory}" \
>>> -p "${parallelism}" \
>>> -yqu "${queue}" \
>>> -yt "${YARN_APP_PATH}" \
>>> -c "${MAIN_CLASS}" \
>>> -yst \
>>> -yd \
>>> ${class_path} \
>>> "${YARN_APP_PATH}"/"${APP_JAR}"
>>>
>>>
>>> After the job has been successfully deplyed, I've got an exception:
>>>
>>> 2018-10-26 18:29:17,781 | ERROR | Curator-Framework-0 |
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>>> | Background exception was not retry-able or retry gave up
>>> java.lang.InterruptedException
>>> at java.lang.Object.wait(Native Method)
>>> at java.lang.Object.wait(Object.java:502)
>>> at
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1406)
>>> at
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1097)
>>> at
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1130)
>>> at
>>> org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths.mkdirs(ZKPaths.java:274)
>>> at
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CreateBuilderImpl$7.performBackgroundOperation(CreateBuilderImpl.java:561)
>>> at
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.OperationAndData.callPerformBackgroundOperation(OperationAndData.java:72)
>>> at
>>> 

Re: Task Manager allocation issue when upgrading 1.6.0 to 1.6.2

2018-11-09 Thread Till Rohrmann
Hi Cliff,

this sounds not right. Could you share the logs of the Yarn cluster
entrypoint with the community for further debugging? Ideally on DEBUG
level. The Yarn logs would also be helpful to fully understand the problem.
Thanks a lot!

Cheers,
Till

On Thu, Nov 8, 2018 at 9:59 PM Cliff Resnick  wrote:

> I'm running a YARN cluster of 8 * 4 core instances = 32 cores, with a
> configuration of 3 slots per TM. The cluster is dedicated to a single job
> that runs at full capacity in "FLIP6" mode. So in this cluster, the
> parallelism is 21 (7 TMs * 3, one container dedicated for Job Manager).
>
> When I run the job in 1.6.0, seven Task Managers are spun up as expected.
> But if I run with 1.6.2 only four Task Managers spin up and the job hangs
> waiting for more resources.
>
> Our Flink distribution is set up by script after building from source. So
> aside from flink jars, both 1.6.0 and 1.6.2 directories are identical. The
> job is the same, restarting from savepoint. The problem is repeatable.
>
> Has something changed in 1.6.2, and if so can it be remedied with a config
> change?
>
>
>
>
>
>


ProvisionedThroughputExceededException

2018-11-09 Thread Steve Bistline
I am trying to use the Kinesis Connector and getting the following error
message. I am not getting any data at all because of this. I have a native
KCL client running in parallel and it is receiving data fine.

Any thoughts?


flink-sbistl919-taskexecutor-0-CACSVML-15736.log:2018-11-09 07:46:11,579
WARN  org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy  -
Got recoverable SdkClientException. Backing off for 258 millis (Rate
exceeded for shard shardId- in stream CSV under account
496346111546. (Service: AmazonKinesis; Status Code: 400; Error Code:
ProvisionedThroughputExceededException; Request ID:
e1c0caa4-8c4c-7738-b59f-4977bc762cf3))

flink-sbistl919-taskexecutor-0-CACSVML-15736.log:2018-11-09 07:46:16,844
WARN  org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy  -
Got recoverable SdkClientException. Backing off for 203 millis (Rate
exceeded for shard shardId-0001 in stream CSV under account
496346111546. (Service: AmazonKinesis; Status Code: 400; Error Code:
ProvisionedThroughputExceededException; Request ID:
f7d22c26-96f6-c547-a38d-affe493cd2e1))


Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-09 Thread Till Rohrmann
Could you send us a small example program which we can use to reproduce the
problem?

Cheers,
Till

On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta  wrote:

> Yeah, it IS using Kryo serializer.
>
> Jayant Ameta
>
>
> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann  wrote:
>
>> Hi Jayant, could you check that the UUID key on the TM is actually
>> serialized using a Kryo serializer? You can do this by setting a breakpoint
>> in the constructor of the `AbstractKeyedStateBackend`.
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy  wrote:
>>
>>> Hi, Jayant
>>>
>>> Your code looks good to me. And I’ve tried the serialize/deserialize
>>> of Kryo on UUID class, it all looks okay.
>>>
>>> I’m not very sure about this problem. Maybe you can write a very
>>> simple demo to try if it works.
>>>
>>>
>>> Jiayi Liao, Best
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta
>>> *Recipient:* bupt_ljy
>>> *Cc:* Tzu-Li (Gordon) Tai; user<
>>> user@flink.apache.org>
>>> *Date:* Monday, Oct 29, 2018 11:53
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Hi Jiayi,
>>> Any further help on this?
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta 
>>> wrote:
>>>
 MapStateDescriptor descriptor = new 
 MapStateDescriptor<>("rulePatterns", UUID.class,
 String.class);

 Jayant Ameta


 On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy  wrote:

> Hi,
>
>Can you show us the descriptor in the codes below?
>
> client.getKvState(JobID.fromHexString(
> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>
> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>> TypeInformation.of(new TypeHint() {}), descriptor);
>>
>>
> Jiayi Liao, Best
>
>
>  Original Message
> *Sender:* Jayant Ameta
> *Recipient:* bupt_ljy
> *Cc:* Tzu-Li (Gordon) Tai; user<
> user@flink.apache.org>
> *Date:* Friday, Oct 26, 2018 02:26
> *Subject:* Re: Queryable state when key is UUID - getting Kyro
> Exception
>
> Also, I haven't provided any custom serializer in my flink job.
> Shouldn't the same configuration work for queryable state client?
>
> Jayant Ameta
>
>
> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta 
> wrote:
>
>> Hi Gordon,
>> Following is the stack trace that I'm getting:
>>
>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: Failed request 0.*
>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>> * Caused by: java.lang.RuntimeException: Error while processing
>> request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException:
>> Encountered unregistered class ID: -985346241*
>> *Serialization trace:*
>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>> * at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>> * at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>> * at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>> * at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>> * at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>> * at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>> * at
>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>> * at
>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>> * at
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
>> * at
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
>> * at
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
>> * at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>> * at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>> * at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>> * at java.lang.Thread.run(Thread.java:748)*
>>
>> I am not using any custom serialize as mentioned by Jiayi.
>>
>> Jayant Ameta
>>
>>
>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy  wrote:
>>
>>> 

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-09 Thread Aljoscha Krettek
Hi,

I think for this case a model that is similar to how the Streaming File Source 
works should be good. You can have a look at ContinuousFileMonitoringFunction 
and ContinuousFileReaderOperator. The idea is that the first emits splits that 
should be processed and the second is responsible for reading those splits. A 
generic version of that is what I'm proposing for the refactoring of our source 
interface [1] that also comes with a prototype implementation [2].

I think something like this should be adaptable to your case. The split 
enumerator would at first only emit file splits downstream, after that it would 
emit Kafka partitions that should be read. The split reader would understand 
both file splits and kafka partitions and can read from both. This still has 
some kinks to be worked out when it comes to watermarks, FLIP-27 is not 
finished.

What do you think?

Best,
Aljoscha

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 

[2] https://github.com/aljoscha/flink/commits/refactor-source-interface 


> On 1. Nov 2018, at 16:50, Aaron Levin  wrote:
> 
> Hey,
> 
> Thanks for reaching out! I'd love to take a step back and find a better 
> solution, so I'll try to be succint in what I'm trying to accomplish:
> 
> We're trying to write a SourceFunction which:
> * reads some Sequence files from S3 in a particular order (each task gets 
> files in a specific order).
> * sends a watermark between each sequence file 
> * when that's complete, starts reading from Kafka topics.
> * (This is similar to the bootstrap problem which Lyft has talked about (see: 
> https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink
>  
> ))
>  
> 
> The current solution I have involves a custom InputFormat, InputSplit, and 
> SplitAssignor. It achieves most of these requirements, except I have to 
> extend InputFormatSourceFunction. I have a class that looks like:
> 
> class MySourceFunction(val s3Archives: CustomInputFormat, val kafka: 
> KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...}
> 
> There are lots I don't like about the existing solution:
> * I have to extend InputFormatSourceFunction to ensure the graph is 
> initialized properly (the bug I wrote about)
> * I had to replicate most of the implementation of InputFormatSourceFunction 
> so I could insert Watermarks between splits. 
> 
> I'd love any suggestions around improving this!
> 
> Best,
> 
> Aaron Levin
> 
> On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek  > wrote:
> Hi Aaron,
> 
> I'l like to take a step back and understand why you're trying to wrap an 
> InputFormatSourceFunction?
> 
> In my opinion, InputFormatSourceFunction should not be used because it has 
> some shortcomings, the most prominent among them that it does not support 
> checkpointing, i.e. in case of failure all data will (probably) be read 
> again. I'm saying probably because the interaction of 
> InputFormatSourceFunction with how InputSplits are generated (which relates 
> to that code snippet with the cast you found) could be somewhat "spooky" and 
> lead to weird results in some cases.
> 
> The interface is a remnant of a very early version of the streaming API and 
> should probably be removed soon. I hope we can find a better solution for 
> your problem that fits better with Flink.
> 
> Best,
> Aljoscha
> 
>> On 1. Nov 2018, at 15:30, Aaron Levin > > wrote:
>> 
>> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can 
>> provide any insight or advice, that would be helpful!
>> 
>> Thanks again.
>> 
>> Best,
>> 
>> Aaron Levin
>> 
>> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin > > wrote:
>> Hey,
>> 
>> Not sure how convo threading works on this list, so in case the folks CC'd 
>> missed my other response, here's some more info:
>> 
>> First, I appreciate everyone's help! Thank you! 
>> 
>> I wrote several wrappers to try and debug this, including one which is an 
>> exact copy of `InputFormatSourceFunction` which also failed. They all failed 
>> with the same error I detail above. I'll post two of them below. They all 
>> extended `RichParallelSourceFunction` and, as far as I could tell, were 
>> properly initialized (though I may have missed something!). Additionally, 
>> for the two below, if I change `extends RichParallelSourceFunction` to 
>> `extends InputFormatSourceFunction(...,...)`, I no longer receive the 
>> exception. This is what led me to believe the source of the issue was 
>> casting and how I found the line of code where the stream graph is given the 

Re: Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

2018-11-09 Thread Fabian Hueske
Hi Arnaud,

Thanks for reporting the issue!

Best, Fabian

Am Do., 8. Nov. 2018 um 20:50 Uhr schrieb LINZ, Arnaud <
al...@bouyguestelecom.fr>:

> 1.FLINK-10832 
>
> Created (with heavy difficulties as typing java code in a jira description
> was an awful experience J)
>
>
>
>
>
> *De :* LINZ, Arnaud
> *Envoyé :* mercredi 7 novembre 2018 11:43
> *À :* 'user' 
> *Objet :* RE: Stopping a streaming app from its own code : behaviour
> change from 1.3 to 1.6
>
>
>
> FYI, the code below ends with version 1.6.0, do not end in 1.6.1. I
> suspect it’s a bug instead of a new feature.
>
>
>
> *De :* LINZ, Arnaud
> *Envoyé :* mercredi 7 novembre 2018 11:14
> *À :* 'user' 
> *Objet :* RE: Stopping a streaming app from its own code : behaviour
> change from 1.3 to 1.6
>
>
>
> Hello,
>
>
>
> This has nothing to do with HA. All my unit tests involving a streaming
> app now fail in “infinite execution”
>
> This simple code never ends :
>
> @Test
>
> *public* *void* testFlink162() *throws* Exception {
>
> // get the execution environment
>
> *final* StreamExecutionEnvironment env =
> StreamExecutionEnvironment.*getExecutionEnvironment*();
>
> // get input data
>
> *final* DataStreamSource text = env.addSource(*new*
> *SourceFunction()* {
>
> @Override
>
> *public* *void* run(*final* SourceContext ctx)
> *throws* Exception {
>
> *for* (*int* count = 0; count < 5; count++) {
>
> ctx.collect(String.*valueOf*(count));
>
> }
>
> }
>
> @Override
>
> *public* *void* cancel() {
>
> }
>
> });
>
> text.print().setParallelism(1);
>
> env.execute("Simple Test");
>
> // Never ends !
>
> }
>
> Is this really a new feature or a critical bug?
>
> In the log, the task executor is stopped
>
> [2018-11-07 11:11:23,608] INFO Stopped TaskExecutor
> akka://flink/user/taskmanager_0.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)
>
> But execute() does not return.
>
>
>
> Arnaud
>
>
>
> Log is :
>
> [2018-11-07 11:11:11,432] INFO Running job on local embedded Flink mini
> cluster
> (org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)
>
> [2018-11-07 11:11:11,449] INFO Starting Flink Mini Cluster
> (org.apache.flink.runtime.minicluster.MiniCluster:227)
>
> [2018-11-07 11:11:11,636] INFO Starting Metrics Registry
> (org.apache.flink.runtime.minicluster.MiniCluster:238)
>
> [2018-11-07 11:11:11,652] INFO No metrics reporter configured, no metrics
> will be exposed/reported.
> (org.apache.flink.runtime.metrics.MetricRegistryImpl:113)
>
> [2018-11-07 11:11:11,703] INFO Starting RPC Service(s)
> (org.apache.flink.runtime.minicluster.MiniCluster:249)
>
> [2018-11-07 11:11:12,244] INFO Slf4jLogger started
> (akka.event.slf4j.Slf4jLogger:92)
>
> [2018-11-07 11:11:12,264] INFO Starting high-availability services
> (org.apache.flink.runtime.minicluster.MiniCluster:290)
>
> [2018-11-07 11:11:12,367] INFO Created BLOB server storage directory
> C:\Users\alinz\AppData\Local\Temp\blobStore-fd104a2d-caaf-4740-a762-d292cb2ed108
> (org.apache.flink.runtime.blob.BlobServer:141)
>
> [2018-11-07 11:11:12,379] INFO Started BLOB server at 0.0.0.0:64504 - max
> concurrent requests: 50 - max backlog: 1000
> (org.apache.flink.runtime.blob.BlobServer:203)
>
> [2018-11-07 11:11:12,380] INFO Starting ResourceManger
> (org.apache.flink.runtime.minicluster.MiniCluster:301)
>
> [2018-11-07 11:11:12,409] INFO Starting RPC endpoint for
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at
> akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 .
> (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
>
> [2018-11-07 11:11:12,432] INFO Proposing leadership to contender
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager@5b1f29fa
> @ akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864
> (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)
>
> [2018-11-07 11:11:12,439] INFO ResourceManager
> akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 was
> granted leadership with fencing token 86394924fb97bad612b67f526f84406f
> (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:953)
>
> [2018-11-07 11:11:12,440] INFO Starting the SlotManager.
> (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:185)
>
> [2018-11-07 11:11:12,442] INFO Received confirmation of leadership for
> leader
> akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 ,
> session=12b67f52-6f84-406f-8639-4924fb97bad6
> (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)
>
> [2018-11-07 11:11:12,452] INFO Created BLOB cache storage directory
> C:\Users\alinz\AppData\Local\Temp\blobStore-b2618f73-5ec6-4fdf-ad43-1da6d6c19a4f
> 

Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-09 Thread Fabian Hueske
Hi,

SQL does not support any custom triggers or timers.
In general, computations are performed when they are complete with respect
to the watermarks (applies for GROUP BY windows, OVER windows, windowed and
time-versioned joins, etc.

Best, Fabian

Am Fr., 9. Nov. 2018 um 05:08 Uhr schrieb yinhua.dai <
yinhua.2...@outlook.com>:

> I am able to write a single operator as you suggested, thank you.
>
> And then I saw ContinuousProcessingTimeTrigger from flink source code, it
> looks like it's something that I am looking for, if there is a way that I
> can customize the SQL "TUMBLE" window to use this trigger instead of
> ProcessingTimeTrigger, then it should solve my problem.
>
> Do you know if there is a way to use a customize trigger in the "TUMBLE"
> window of SQL?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Manually clean SQL keyed state

2018-11-09 Thread Fabian Hueske
Hi Shahar,

That's not possible at the moment. The SQL API does not provide any knobs
to control state size besides the idle state retention.
The reason is that it aims to be as accurate as possible.

In the future it might be possible to provide more information to the
system (like constraints in relational DBMS) that can help to reduce the
memory footprint.

Best, Fabian

Am Fr., 9. Nov. 2018 um 02:06 Uhr schrieb shkob1 :

> I have a scenario in which i do a non-windowed group by using SQL.
> something
> like
>
> "Select count(*) as events, shouldTrigger(..) as shouldTrigger from source
> group by sessionId"
> i'm then converting to a retracted stream, filtering by "add" messages,
> then
> further filtering by "shouldTrigger" field and sends out the result to a
> sink.
>
> While i'm using the query config (idle state retention time), it seems like
> i can reduce the state size by clearing the state of the specific session
> id
> earlier ("shouldTrigger" marks the end of the session rather than a timed
> window).
>
> Is there a way for me to clear that state assuming i need to use the SQL
> API?
>
> Thanks!
> Shahar
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Where is the "Latest Savepoint" information saved?

2018-11-09 Thread Paul Lam
Hi Hao,

The savepoint path is stored in ZK, but it’s in binary format, so in order to 
retrieve the path you have to deserialize it back to some Flink internal object.

A better approach would be using REST api to get the path. You could find it 
here[1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
 


Best,
Paul Lam


> 在 2018年11月9日,13:55,Hao Sun  写道:
> 
> Since this save point path is very useful to application updates, where is 
> this information stored? Can we keep it in ZK or S3 for retrieval?
> 
> 



Re: FlinkCEP, circular references and checkpointing failures

2018-11-09 Thread Shailesh Jain
Thank you, Stefan. Any ideas on when can we expect 1.6.3 release?

On Thu, Nov 8, 2018 at 4:28 PM Stefan Richter 
wrote:

> Sure, it is already merged as FLINK-10816.
>
> Best,
> Stefan
>
> On 8. Nov 2018, at 11:53, Shailesh Jain 
> wrote:
>
> Thanks a lot for looking into this issue Stefan.
>
> Could you please let me know the issue ID once you open it? It'll help me
> understand the problem better, and also I could do a quick test in our
> environment once the issue is resolved.
>
> Thanks,
> Shailesh
>
> On Wed, Nov 7, 2018, 10:46 PM Till Rohrmann 
>> Really good finding Stefan!
>>
>> On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> I think I can already spot the
>>> problem: LockableTypeSerializer.duplicate() is not properly implemented
>>> because it also has to call duplicate() on the element serialiser that is
>>> passed into the constructor of the new instance. I will open an issue and
>>> fix the problem.
>>>
>>> Best,
>>> Stefan
>>>
>>> On 7. Nov 2018, at 17:17, Till Rohrmann  wrote:
>>>
>>> Hi Shailesh,
>>>
>>> could you maybe provide us with an example program which is able to
>>> reproduce this problem? This would help the community to better debug the
>>> problem. It looks not right and might point towards a bug in Flink. Thanks
>>> a lot!
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz 
>>> wrote:
>>>
 This is some problem with serializing your events using Kryo. I'm
 adding Gordon to cc, as he was recently working with serializers. He might
 give you more insights what is going wrong.

 Best,

 Dawid
 On 25/10/2018 05:41, Shailesh Jain wrote:

 Hi Dawid,

 I've upgraded to flink 1.6.1 and rebased by changes against the tag
 1.6.1, the only commit on top of 1.6 is this:
 https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c

 I ran two separate identical jobs (with and without checkpointing
 enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) 
 *only
 when checkpointing (HDFS backend) is enabled*, with the below stack
 trace.

 I did see a similar problem with different operators here (
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
 Is this a known issue which is getting addressed?

 Any ideas on what could be causing this?

 Thanks,
 Shailesh


 2018-10-24 17:04:13,365 INFO
 org.apache.flink.runtime.taskmanager.Task -
 SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
 (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
 org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
 function.
 at
 org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
 at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
 at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
 at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
 at
 org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
 at
 org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
 at
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
 at
 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.flink.util.WrappingRuntimeException:
 java.lang.ArrayIndexOutOfBoundsException: -1
 at
 org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
 at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
 at
 org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
 at
 org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
 at
 org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
 at
 com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
 at
 com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
 at