Re: Flink ML feature

2019-12-09 Thread vino yang
Hi Chandu,

AFAIK, there is a project named Alink[1] which is the Machine Learning
algorithm platform based on Flink, developed by the PAI team of Alibaba
computing platform. FYI

Best,
Vino

[1]: https://github.com/alibaba/Alink

Tom Blackwood  于2019年12月10日周二 下午2:07写道:

> You may try Spark ML, which is a production ready library for ML stuff.
>
> regards.
>
> On Tue, Dec 10, 2019 at 1:04 PM chandu soa  wrote:
>
>> Hello Community,
>>
>> Can you please give me some pointers for implementing Machine Learning
>> using Flink.
>>
>> I see Flink ML libraries were dropped in v1.9. It looks like ML feature
>> in Flink going to be enhanced.
>>
>> What is the recommended approach for implementing production grade ML
>> based apps using Flink? v1.9 is ok?or should wait for 1.10?
>>
>> Thanks,
>> Chandu
>>
>


Re: Flink ML feature

2019-12-09 Thread Tom Blackwood
You may try Spark ML, which is a production ready library for ML stuff.

regards.

On Tue, Dec 10, 2019 at 1:04 PM chandu soa  wrote:

> Hello Community,
>
> Can you please give me some pointers for implementing Machine Learning
> using Flink.
>
> I see Flink ML libraries were dropped in v1.9. It looks like ML feature in
> Flink going to be enhanced.
>
> What is the recommended approach for implementing production grade ML
> based apps using Flink? v1.9 is ok?or should wait for 1.10?
>
> Thanks,
> Chandu
>


Flink SQL Kafka topic DDL ,the kafka' json field conflict with flink SQL Keywords

2019-12-09 Thread LakeShen
Hi community, when I use Flink SQL DDL ,the kafka' json field conflict with
flink SQL Keywords,my thought is that using the UDTF to solve it . Is there
graceful way to solve this problem?


Re: SQL for Avro GenericRecords on Parquet

2019-12-09 Thread Peter Huang
Hi Hanan,

I created a fix for the problem. Would you please try it from your side?
https://github.com/apache/flink/pull/10371


Best Regards
Peter Huang

On Tue, Nov 26, 2019 at 8:07 AM Peter Huang 
wrote:

> Hi Hanan,
>
> After investigating the issue by using the test case you provided, I think
> there is a big in it. Currently, the parquet predicts push down use the
> predicate literal type to construct the FilterPredicate.
> The issue happens when the data type of value in predicate inferred from
> SQL doesn't match the parquet schema. For example, foo is a long type, foo
> < 1 is the predicate. Literal will be recognized as an integration. It
> causes the parquet FilterPredicate is mistakenly created for the column of
> Integer type. I created a ticket for the issue.
> https://issues.apache.org/jira/browse/FLINK-14953. Please also add more
> insight by comment directly on it.
>
>
> Best Regards
> Peter Huang
>
> On Mon, Nov 18, 2019 at 12:40 PM Hanan Yehudai 
> wrote:
>
>> HI Peter.  Thanks.
>>
>> This is my code .  I used one of the parquet / avro tests as a reference.
>>
>>
>>
>> The code will fail on
>>
>> *Test testScan(ParquetTestCase) failed with:*
>>
>> *java.lang.UnsupportedOperationException*
>>
>> *   at
>> org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)*
>>
>> *   at
>> org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)*
>>
>> *   at
>> org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)*
>>
>>
>>
>>
>>
>> CODE :
>>
>>
>>
>> import org.apache.avro.Schema;
>>
>> import org.apache.avro.generic.GenericRecord;
>>
>> import org.apache.avro.generic.GenericRecordBuilder;
>>
>> import org.apache.avro.specific.SpecificRecord;
>>
>> import org.apache.avro.specific.SpecificRecordBuilderBase;
>>
>> import org.apache.flink.api.common.typeinfo.Types;
>>
>> import org.apache.flink.api.java.DataSet;
>>
>> import org.apache.flink.api.java.ExecutionEnvironment;
>>
>> import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
>>
>> import org.apache.flink.api.java.io.TupleCsvInputFormat;
>>
>> import org.apache.flink.api.java.tuple.Tuple;
>>
>> import org.apache.flink.core.fs.FileSystem;
>>
>> import org.apache.flink.core.fs.Path;
>>
>>
>>
>> import org.apache.flink.formats.parquet.ParquetTableSource;
>>
>> import org.apache.flink.streaming.api.datastream.DataStream;
>>
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>
>> import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
>>
>> import org.apache.flink.table.api.Table;
>>
>> import org.apache.flink.table.api.TableEnvironment;
>>
>> import org.apache.flink.table.api.java.BatchTableEnvironment;
>>
>>
>>
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>
>> import org.apache.flink.table.sinks.CsvTableSink;
>>
>> import org.apache.flink.table.sinks.TableSink;
>>
>> import org.apache.flink.test.util.MultipleProgramsTestBase;
>>
>> import org.apache.flink.types.Row;
>>
>>
>>
>> import org.apache.avro.generic.IndexedRecord;
>>
>> import org.apache.parquet.avro.AvroSchemaConverter;
>>
>> import org.apache.parquet.schema.MessageType;
>>
>> import org.junit.BeforeClass;
>>
>> import org.junit.ClassRule;
>>
>> import org.junit.Test;
>>
>> import org.junit.rules.TemporaryFolder;
>>
>>
>>
>> import java.io.IOException;
>>
>> import java.util.ArrayList;
>>
>> import java.util.List;
>>
>> import java.util.UUID;
>>
>>
>>
>> import static org.junit.Assert.assertEquals;
>>
>>
>>
>> import org.apache.parquet.avro.AvroParquetWriter;
>>
>> import org.apache.parquet.hadoop.ParquetWriter;
>>
>>
>>
>>
>>
>> public class  ParquetTestCase extends MultipleProgramsTestBase {
>>
>>
>>
>> private static String avroSchema = "{\n" +
>>
>> "  \"name\": \"SimpleRecord\",\n" +
>>
>> "  \"type\": \"record\",\n" +
>>
>> "  \"fields\": [\n" +
>>
>> "{ \"default\": null, \"name\": \"timestamp_edr\",
>> \"type\": [ \"null\", \"long\" ]},\n" +
>>
>> "{ \"default\": null, \"name\": \"id\", \"type\": [
>> \"null\", \"long\" ]},\n" +
>>
>> "{ \"default\": null, \"name\": \"recordType_\",
>> \"type\": [ \"null\", \"string\"]}\n" +
>>
>> "  ],\n" +
>>
>> "  \"schema_id\": 1,\n" +
>>
>> "  \"type\": \"record\"\n" +
>>
>> "}";
>>
>>
>>
>> private static final AvroSchemaConverter SCHEMA_CONVERTER = new
>> AvroSchemaConverter();
>>
>> private static Schema schm = new Schema.Parser().parse(avroSchema);
>>
>> private static Path testPath;
>>
>>
>>
>>
>>
>> public ParquetTestCase() {
>>
>> super(TestExecutionMode.COLLECTION);
>>
>> }
>>
>>
>>
>>
>>
>> @BeforeClass
>>
>> public static void setup() throws Exception {
>>
>>
>>
>> G

Flink ML feature

2019-12-09 Thread chandu soa
Hello Community,

Can you please give me some pointers for implementing Machine Learning
using Flink.

I see Flink ML libraries were dropped in v1.9. It looks like ML feature in
Flink going to be enhanced.

What is the recommended approach for implementing production grade ML based
apps using Flink? v1.9 is ok?or should wait for 1.10?

Thanks,
Chandu


Re: Emit intermediate accumulator state of a session window

2019-12-09 Thread chandu soa
Thank you all for your responses.

I've created a custom trigger similar to flink provided EventTimeTrigger,
with few changes. Fire event on onElement(), and do not fire event on
onEventTime() to satisfy my requirement - whenever new event arrives fire
incremental result(result of AggregateFunction#add()) immediately. Find
below changed code block.

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow
window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.*FIRE*; // instead of CONTINUE
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.*CONTINUE* : // instead of FIRE
TriggerResult.CONTINUE;
}

Thanks,
Chandu


On Sun, Dec 8, 2019 at 8:22 AM Rafi Aroch  wrote:

> Hi Chandu,
>
> Maybe you can use a custom trigger:
> * .trigger(**ContinuousEventTimeTrigger.of(Time.minutes(15)))*
>
> This would continuously trigger your aggregate every period of time.
>
> Thanks,
> Rafi
>
>
> On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin 
> wrote:
>
>> Hi Chandu,
>>
>> I am not sure whether using the windowing API is helpful in this case at
>> all.
>>
>> At least, you could try to consume the data not only by windowing but
>> also by a custom stateful function.
>> You look into the AggregatingState [1]. Then you could do whatever you
>> want with the current aggregated value.
>> If you still need to do something with the result of windowing, you could
>> do it as now or simulate it with timers [2] in that same stateful function.
>>
>> Best,
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example
>>
>> On Tue, Dec 3, 2019 at 12:21 AM chandu soa  wrote:
>>
>>> *Emit intermediate accumulator(AggregateFunction ACC value) state of a
>>> session window when new event arrives*
>>>
>>>
>>>
>>> AggregateFunction#getResults() is called only when window completes. My
>>> need is emit intermediate accumulator values(result of
>>> AggregateFunction#add()) as well and write them to Sink. Both
>>> AggregateFunction#getResult() and ProcessWindowFunction() provides
>>> aggregated result, only when the window is closed.
>>>
>>> *Any thoughts please, how to emit or stream intermediate accumulator
>>> state as soon as new event arrive when window is open? Need to implement
>>> custom trigger or Assigner?*
>>>
>>>
>>>
>>> To give you some background, when user watches a video we get events -
>>> when clicked, thereafter every ~ 15minutes, and finally when user close the
>>> video.
>>>
>>> I need to aggregate them as soon as they arrive and post it to
>>> destination. For example, if user watching a two-hour movie I get events
>>> for 15 min interval(0,15,30,...,120), whenever I get a event need to
>>> aggregate watched percentage so far and write it to sink(0%, 12.5%,
>>> 25%,...,100%). The below implementation emitting(getResult()) a single
>>> event 20 minutes after watching a video.
>>>
>>>
>>>
>>>
>>>
>>> .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*)
>>>
>>>
>>> .aggregate(new EventAggregator())
>>>
>>>
>>> .filter(new FinalFilter())
>>>
>>>
>>> .addSink(...)
>>>
>>>
>>> Appreciate your help.
>>>
>>>
>>> Thanks,
>>>
>>> chandu
>>>
>>


Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-09 Thread Yang Wang
Hi Li Peng,

You are running standalone session cluster or per-job cluster on
kubernetes. Right?
If so, i think you need to check your log4j.properties in the image, not
local. The log is
stored to /opt/flink/log/jobmanager.log by default.

If you are running active Kubernetes integration for a fresh taste. The
following cli option
could be used to remove the redirect.
-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem%
%jvmopts% %logging% %class% %args%"

Best,
Yang

vino yang  于2019年12月10日周二 上午10:55写道:

> Hi Li,
>
> A potential reason could be conflicting logging frameworks. Can you share
> the log in your .out file and let us know if the print format of the log is
> the same as the configuration file you gave.
>
> Best,
> Vino
>
> Li Peng  于2019年12月10日周二 上午10:09写道:
>
>> Hey folks, I noticed that my kubernetes flink logs (reached via *kubectl
>> logs *) completely ignore any of the configurations I put into
>> /flink/conf/. I set the logger level to WARN, yet I still see INFO level
>> logging from flink loggers
>> like org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made
>> copied the same properties to /flink/conf/log4j-console.properties
>> and log4j-cli.properties.
>>
>> From what I can tell, kubernetes just listens to stdout and stderr, so
>> shouldn't the log4j.properties control output to them? Anyone seen this
>> issue before?
>>
>> Here is my log4j.properties:
>>
>>
>> # This affects logging for both user code and Flink
>> log4j.rootLogger=WARN, file, console, stdout
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>> log4j.logger.org.apache.flink=WARN
>>
>> # The following lines keep the log level of common libraries/connectors on
>> # log level INFO. The root logger does not override this. You have to 
>> manually
>> # change the log levels here.
>> log4j.logger.akka=INFO
>> log4j.logger.org.apache.kafka=INFO
>> log4j.logger.org.apache.hadoop=INFO
>> log4j.logger.org.apache.zookeeper=INFO
>>
>> # Log all infos in the given file
>> log4j.appender.file=org.apache.log4j.FileAppender
>> log4j.appender.file.file=${log.file}
>> log4j.appender.file.append=false
>> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
>> %-5p %-60c %x - %m%n
>>
>> # Log all infos to the console
>> log4j.appender.console=org.apache.log4j.ConsoleAppender
>> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>> log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
>> %-5p %-60c %x - %m%n
>>
>> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
>> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
>>  file, console
>> log4j.logger.org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction=WARN
>> log4j.logger.org.apache.flink.runtime.checkpoint=WARN
>>
>> Thanks,
>> Li
>>
>


Re: Apache Flink - Retries for async processing

2019-12-09 Thread Jingsong Li
Hi M Singh,

Our internal has this scenario too, as far as I know, Flink does not have
this internal mechanism in 1.9 too.
I can share my solution:
- In async function, start a thread factory.
- Send the call to thread factory when this call has failed. Do refresh
security token too.
Actually, deal with anything in function. As long as we finally call the
relevant methods of ResultFuture.

Best,
Jingsong Lee

On Tue, Dec 10, 2019 at 3:25 AM M Singh  wrote:

> Hi Folks:
>
> I am working on a project where I will be using Flink's async processing
> capabilities.  The job has to make http request using a token.  The token
> expires periodically and needs to be refreshed.
>
> So, I was looking for patterns for handling async call failures and
> retries when the token expires.  I found this link Re: Backoff strategies
> for async IO functions?
> 
>  and
> it appears that Flink does not support retries and periodically refresh a
> security token.  I am using 1.6 at the moment but am planning to migrate to
> 1.9 soon.
>
> Re: Backoff strategies for async IO functions?
>
>
> 
>
>
> If there are any patterns on how to deal with this scenario, please let me
> know.
>
> Thanks
>
> Mans
>
>

-- 
Best, Jingsong Lee


Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-09 Thread Yun Tang
Hi Peng

What kind of deployment of K8s did you try in flink-doc[1], if using session 
mode, you can control your log4j configuration via configmap [2]. From my 
experience, this could control the log4j well.

If you did not override the command of flink docker, it will start-foreground 
the taskmanager.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html#session-cluster-resource-definitions

Best
Yun Tang

From: Li Peng 
Date: Tuesday, December 10, 2019 at 10:09 AM
To: user 
Subject: Flink on Kubernetes seems to ignore log4j.properties

Hey folks, I noticed that my kubernetes flink logs (reached via kubectl logs 
) completely ignore any of the configurations I put into 
/flink/conf/. I set the logger level to WARN, yet I still see INFO level 
logging from flink loggers like 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made copied 
the same properties to /flink/conf/log4j-console.properties and 
log4j-cli.properties.

From what I can tell, kubernetes just listens to stdout and stderr, so 
shouldn't the log4j.properties control output to them? Anyone seen this issue 
before?

Here is my log4j.properties:

# This affects logging for both user code and Flink
log4j.rootLogger=WARN, file, console, stdout

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=WARN

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
%-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
 file, console
log4j.logger.org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction=WARN
log4j.logger.org.apache.flink.runtime.checkpoint=WARN
Thanks,
Li


Re: StreamingFileSink doesn't close multipart uploads to s3?

2019-12-09 Thread Jingsong Li
Hi Kostas,

I  took a look to StreamingFileSink.close, it just delete all temporary
files. I know it is for failover. When Job fail, it should just delete temp
files for next restart.
But for testing purposes, we just want to run a bounded streaming job. If
there is no checkpoint trigger, no one will move the final temp files to
output path, so the result of this job is wrong.
Do you have any idea about this? Can we distinguish "fail close" from
"success finish close" in StreamingFileSink?

Best,
Jingsong Lee

On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas  wrote:

> Hi Li,
>
> This is the expected behavior. All the "exactly-once" sinks in Flink
> require checkpointing to be enabled.
> We will update the documentation to be clearer in the upcoming release.
>
> Thanks a lot,
> Kostas
>
> On Sat, Dec 7, 2019 at 3:47 AM Li Peng  wrote:
> >
> > Ok I seem to have solved the issue by enabling checkpointing. Based on
> the docs (I'm using 1.9.0), it seemed like only
> StreamingFileSink.forBulkFormat() should've required checkpointing, but
> based on this experience, StreamingFileSink.forRowFormat() requires it too!
> Is this the intended behavior? If so, the docs should probably be updated.
> >
> > Thanks,
> > Li
> >
> > On Fri, Dec 6, 2019 at 2:01 PM Li Peng  wrote:
> >>
> >> Hey folks, I'm trying to get StreamingFileSink to write to s3 every
> minute, with flink-s3-fs-hadoop, and based on the default rolling policy,
> which is configured to "roll" every 60 seconds, I thought that would be
> automatic (I interpreted rolling to mean actually close a multipart upload
> to s3).
> >>
> >> But I'm not actually seeing files written to s3 at all, instead I see a
> bunch of open multipart uploads when I check the AWS s3 console, for
> example:
> >>
> >>  "Uploads": [
> >> {
> >> "Initiated": "2019-12-06T20:57:47.000Z",
> >> "Key": "2019-12-06--20/part-0-0"
> >> },
> >> {
> >> "Initiated": "2019-12-06T20:57:47.000Z",
> >> "Key": "2019-12-06--20/part-1-0"
> >> },
> >> {
> >> "Initiated": "2019-12-06T21:03:12.000Z",
> >> "Key": "2019-12-06--21/part-0-1"
> >> },
> >> {
> >> "Initiated": "2019-12-06T21:04:15.000Z",
> >> "Key": "2019-12-06--21/part-0-2"
> >> },
> >> {
> >> "Initiated": "2019-12-06T21:22:23.000Z"
> >> "Key": "2019-12-06--21/part-0-3"
> >> }
> >> ]
> >>
> >> And these uploads are being open for a long time. So far after an hour,
> none of the uploads have been closed. Is this the expected behavior? If I
> wanted to get these uploads to actually write to s3 quickly, do I need to
> configure the hadoop stuff to get that done, like setting a smaller
> buffer/partition size to force it to upload?
> >>
> >> Thanks,
> >> Li
>


-- 
Best, Jingsong Lee


Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-09 Thread vino yang
Hi Li,

A potential reason could be conflicting logging frameworks. Can you share
the log in your .out file and let us know if the print format of the log is
the same as the configuration file you gave.

Best,
Vino

Li Peng  于2019年12月10日周二 上午10:09写道:

> Hey folks, I noticed that my kubernetes flink logs (reached via *kubectl
> logs *) completely ignore any of the configurations I put into
> /flink/conf/. I set the logger level to WARN, yet I still see INFO level
> logging from flink loggers
> like org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made
> copied the same properties to /flink/conf/log4j-console.properties
> and log4j-cli.properties.
>
> From what I can tell, kubernetes just listens to stdout and stderr, so
> shouldn't the log4j.properties control output to them? Anyone seen this
> issue before?
>
> Here is my log4j.properties:
>
>
> # This affects logging for both user code and Flink
> log4j.rootLogger=WARN, file, console, stdout
>
> # Uncomment this if you want to _only_ change Flink's logging
> log4j.logger.org.apache.flink=WARN
>
> # The following lines keep the log level of common libraries/connectors on
> # log level INFO. The root logger does not override this. You have to manually
> # change the log levels here.
> log4j.logger.akka=INFO
> log4j.logger.org.apache.kafka=INFO
> log4j.logger.org.apache.hadoop=INFO
> log4j.logger.org.apache.zookeeper=INFO
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
> %-60c %x - %m%n
>
> # Log all infos to the console
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
> %-5p %-60c %x - %m%n
>
> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
>  file, console
> log4j.logger.org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction=WARN
> log4j.logger.org.apache.flink.runtime.checkpoint=WARN
>
> Thanks,
> Li
>


Flink on Kubernetes seems to ignore log4j.properties

2019-12-09 Thread Li Peng
Hey folks, I noticed that my kubernetes flink logs (reached via *kubectl
logs *) completely ignore any of the configurations I put into
/flink/conf/. I set the logger level to WARN, yet I still see INFO level
logging from flink loggers
like org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made
copied the same properties to /flink/conf/log4j-console.properties
and log4j-cli.properties.

>From what I can tell, kubernetes just listens to stdout and stderr, so
shouldn't the log4j.properties control output to them? Anyone seen this
issue before?

Here is my log4j.properties:


# This affects logging for both user code and Flink
log4j.rootLogger=WARN, file, console, stdout

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=WARN

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd
HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{-MM-dd
HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
file, console
log4j.logger.org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction=WARN
log4j.logger.org.apache.flink.runtime.checkpoint=WARN

Thanks,
Li


Apache Flink - Retries for async processing

2019-12-09 Thread M Singh
Hi Folks:
I am working on a project where I will be using Flink's async processing 
capabilities.  The job has to make http request using a token.  The token 
expires periodically and needs to be refreshed.
So, I was looking for patterns for handling async call failures and retries 
when the token expires.  I found this link Re: Backoff strategies for async IO 
functions? and it appears that Flink does not support retries and periodically 
refresh a security token.  I am using 1.6 at the moment but am planning to 
migrate to 1.9 soon.

| 
| 
|  | 
Re: Backoff strategies for async IO functions?


 |

 |

 |

 
If there are any patterns on how to deal with this scenario, please let me know.
Thanks
Mans


Re: StreamingFileSink doesn't close multipart uploads to s3?

2019-12-09 Thread Kostas Kloudas
Hi Li,

This is the expected behavior. All the "exactly-once" sinks in Flink
require checkpointing to be enabled.
We will update the documentation to be clearer in the upcoming release.

Thanks a lot,
Kostas

On Sat, Dec 7, 2019 at 3:47 AM Li Peng  wrote:
>
> Ok I seem to have solved the issue by enabling checkpointing. Based on the 
> docs (I'm using 1.9.0), it seemed like only StreamingFileSink.forBulkFormat() 
> should've required checkpointing, but based on this experience, 
> StreamingFileSink.forRowFormat() requires it too! Is this the intended 
> behavior? If so, the docs should probably be updated.
>
> Thanks,
> Li
>
> On Fri, Dec 6, 2019 at 2:01 PM Li Peng  wrote:
>>
>> Hey folks, I'm trying to get StreamingFileSink to write to s3 every minute, 
>> with flink-s3-fs-hadoop, and based on the default rolling policy, which is 
>> configured to "roll" every 60 seconds, I thought that would be automatic (I 
>> interpreted rolling to mean actually close a multipart upload to s3).
>>
>> But I'm not actually seeing files written to s3 at all, instead I see a 
>> bunch of open multipart uploads when I check the AWS s3 console, for example:
>>
>>  "Uploads": [
>> {
>> "Initiated": "2019-12-06T20:57:47.000Z",
>> "Key": "2019-12-06--20/part-0-0"
>> },
>> {
>> "Initiated": "2019-12-06T20:57:47.000Z",
>> "Key": "2019-12-06--20/part-1-0"
>> },
>> {
>> "Initiated": "2019-12-06T21:03:12.000Z",
>> "Key": "2019-12-06--21/part-0-1"
>> },
>> {
>> "Initiated": "2019-12-06T21:04:15.000Z",
>> "Key": "2019-12-06--21/part-0-2"
>> },
>> {
>> "Initiated": "2019-12-06T21:22:23.000Z"
>> "Key": "2019-12-06--21/part-0-3"
>> }
>> ]
>>
>> And these uploads are being open for a long time. So far after an hour, none 
>> of the uploads have been closed. Is this the expected behavior? If I wanted 
>> to get these uploads to actually write to s3 quickly, do I need to configure 
>> the hadoop stuff to get that done, like setting a smaller buffer/partition 
>> size to force it to upload?
>>
>> Thanks,
>> Li


Re: User program failures cause JobManager to be shutdown

2019-12-09 Thread 김동원
Hi Robert,

Yeah, I know. For the moment, I warned my colleagues not to call System.exit() 
:-) But it needs to be implemented for the sake of Flink usability as you 
described in the issue.
Thanks a lot for taking care of this issue.

Best,

Dongwon

> 2019. 12. 9. 오후 9:55, Robert Metzger  작성:
> 
> 
> Hey Dongwon,
> I filed a ticket: https://issues.apache.org/jira/browse/FLINK-15156
> This does not mean it will be implemented anytime soon :)
> 
>> On Mon, Dec 9, 2019 at 2:25 AM Dongwon Kim  wrote:
>> Hi Robert and Roman, 
>> Yeah, letting users know System.exit() is called would be much more 
>> appropriate than just intercepting and ignoring.
>> 
>> Best,
>> Dongwon
>> 
>>> On Sat, Dec 7, 2019 at 11:29 PM Robert Metzger  wrote:
>>> I guess we could manage the security only when calling the user's main() 
>>> method.
>>> 
>>> This problem actually exists for all usercode in Flink: You can also kill 
>>> TaskManagers like this.
>>> If we are going to add something like this to Flink, I would only log that 
>>> System.exit() has been called by the user code, not intercept and ignore 
>>> the call.
>>> 
 On Fri, Dec 6, 2019 at 10:31 AM Khachatryan Roman 
  wrote:
 Hi Dongwon,
 
 This should work but it could also interfere with Flink itself exiting in 
 case of a fatal error.
 
 Regards,
 Roman
 
 
> On Fri, Dec 6, 2019 at 2:54 AM Dongwon Kim  wrote:
> FYI, we've launched a session cluster where multiple jobs are managed by 
> a job manager. If that happens, all the other jobs also fail because the 
> job manager is shut down and all the task managers get into chaos 
> (failing to connect to the job manager).
> 
> I just searched a way to prevent System.exit() calls from terminating 
> JVMs and found [1]. Can it be a possible solution to the problem?
> 
> [1] 
> https://stackoverflow.com/questions/5549720/how-to-prevent-calls-to-system-exit-from-terminating-the-jvm
> 
> Best,
> - Dongwon
> 
>> On Fri, Dec 6, 2019 at 10:39 AM Dongwon Kim  
>> wrote:
>> Hi Robert and Roman,
>> 
>> Thank you for taking a look at this.
>> 
>>> what is your main() method / client doing when it's receiving wrong 
>>> program parameters? Does it call System.exit(), or something like that?
>> 
>> I just found that our HTTP client is programmed to call System.exit(1). 
>> I should guide not to call System.exit() in Flink applications. 
>> 
>> p.s. Just out of curiosity, is there no way for the web app to intercept 
>> System.exit() and prevent the job manager from being shutting down?
>> 
>> Best,
>> 
>> - Dongwon
>> 
>>> On Fri, Dec 6, 2019 at 3:59 AM Robert Metzger  
>>> wrote:
>>> Hi Dongwon,
>>> 
>>> what is your main() method / client doing when it's receiving wrong 
>>> program parameters? Does it call System.exit(), or something like that?
>>> 
>>> By the way, the http address from the error message is publicly 
>>> available. Not sure if this is internal data or not.
>>> 
 On Thu, Dec 5, 2019 at 6:32 PM Khachatryan Roman 
  wrote:
 Hi Dongwon,
 
 I wasn't able to reproduce your problem with Flink JobManager 1.9.1 
 with various kinds of errors in the job.
 I suggest you try it on a fresh Flink installation without any other 
 jobs submitted.
 
 Regards,
 Roman
 
 
> On Thu, Dec 5, 2019 at 3:48 PM Dongwon Kim  
> wrote:
> Hi Roman,
> 
> We're using the latest version 1.9.1 and those two lines are all I've 
> seen after executing the job on the web ui.
> 
> Best,
> 
> Dongwon
> 
>> On Thu, Dec 5, 2019 at 11:36 PM r_khachatryan 
>>  wrote:
>> Hi Dongwon,
>> 
>> Could you please provide Flink version you are running and the job 
>> manager
>> logs?
>> 
>> Regards,
>> Roman
>> 
>> 
>> eastcirclek wrote
>> > Hi,
>> > 
>> > I tried to run a program by uploading a jar on Flink UI. When I
>> > intentionally enter a wrong parameter to my program, JobManager 
>> > dies.
>> > Below
>> > is all log messages I can get from JobManager; JobManager dies as 
>> > soon as
>> > spitting the second line:
>> > 
>> > 2019-12-05 04:47:58,623 WARN
>> >>  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
>> >> Configuring the job submission via query parameters is 
>> >> deprecated. Please
>> >> migrate to submitting a JSON request instead.
>> >>
>> >>
>> >> *2019-12-05 04:47:59,133 ERROR com.skt.apm.http.HTTPClient
>> >>   - Cannot
>> >> connect:http://52.1

Re: User program failures cause JobManager to be shutdown

2019-12-09 Thread Robert Metzger
Hey Dongwon,
I filed a ticket: https://issues.apache.org/jira/browse/FLINK-15156
This does not mean it will be implemented anytime soon :)

On Mon, Dec 9, 2019 at 2:25 AM Dongwon Kim  wrote:

> Hi Robert and Roman,
> Yeah, letting users know System.exit() is called would be much more
> appropriate than just intercepting and ignoring.
>
> Best,
> Dongwon
>
> On Sat, Dec 7, 2019 at 11:29 PM Robert Metzger 
> wrote:
>
>> I guess we could manage the security only when calling the user's main()
>> method.
>>
>> This problem actually exists for all usercode in Flink: You can also kill
>> TaskManagers like this.
>> If we are going to add something like this to Flink, I would only log
>> that System.exit() has been called by the user code, not intercept and
>> ignore the call.
>>
>> On Fri, Dec 6, 2019 at 10:31 AM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> This should work but it could also interfere with Flink itself exiting
>>> in case of a fatal error.
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Fri, Dec 6, 2019 at 2:54 AM Dongwon Kim 
>>> wrote:
>>>
 FYI, we've launched a session cluster where multiple jobs are managed
 by a job manager. If that happens, all the other jobs also fail because the
 job manager is shut down and all the task managers get into chaos (failing
 to connect to the job manager).

 I just searched a way to prevent System.exit() calls from terminating
 JVMs and found [1]. Can it be a possible solution to the problem?

 [1]
 https://stackoverflow.com/questions/5549720/how-to-prevent-calls-to-system-exit-from-terminating-the-jvm

 Best,
 - Dongwon

 On Fri, Dec 6, 2019 at 10:39 AM Dongwon Kim 
 wrote:

> Hi Robert and Roman,
>
> Thank you for taking a look at this.
>
> what is your main() method / client doing when it's receiving wrong
>> program parameters? Does it call System.exit(), or something like that?
>>
>
> I just found that our HTTP client is programmed to call
> System.exit(1). I should guide not to call System.exit() in Flink
> applications.
>
> p.s. Just out of curiosity, is there no way for the web app to
> intercept System.exit() and prevent the job manager from being shutting
> down?
>
> Best,
>
> - Dongwon
>
> On Fri, Dec 6, 2019 at 3:59 AM Robert Metzger 
> wrote:
>
>> Hi Dongwon,
>>
>> what is your main() method / client doing when it's receiving wrong
>> program parameters? Does it call System.exit(), or something like that?
>>
>> By the way, the http address from the error message is
>> publicly available. Not sure if this is internal data or not.
>>
>> On Thu, Dec 5, 2019 at 6:32 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> I wasn't able to reproduce your problem with Flink JobManager 1.9.1
>>> with various kinds of errors in the job.
>>> I suggest you try it on a fresh Flink installation without any other
>>> jobs submitted.
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Dec 5, 2019 at 3:48 PM Dongwon Kim 
>>> wrote:
>>>
 Hi Roman,

 We're using the latest version 1.9.1 and those two lines are all
 I've seen after executing the job on the web ui.

 Best,

 Dongwon

 On Thu, Dec 5, 2019 at 11:36 PM r_khachatryan <
 khachatryan.ro...@gmail.com> wrote:

> Hi Dongwon,
>
> Could you please provide Flink version you are running and the job
> manager
> logs?
>
> Regards,
> Roman
>
>
> eastcirclek wrote
> > Hi,
> >
> > I tried to run a program by uploading a jar on Flink UI. When I
> > intentionally enter a wrong parameter to my program, JobManager
> dies.
> > Below
> > is all log messages I can get from JobManager; JobManager dies
> as soon as
> > spitting the second line:
> >
> > 2019-12-05 04:47:58,623 WARN
> >>  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
> >> Configuring the job submission via query parameters is
> deprecated. Please
> >> migrate to submitting a JSON request instead.
> >>
> >>
> >> *2019-12-05 04:47:59,133 ERROR com.skt.apm.http.HTTPClient
> >>   - Cannot
> >> connect:
> http://52.141.38.11:8380/api/spec/poc_asset_model_01/model/imbalance/models
> >> <
> http://52.141.38.11:8380/api/spec/poc_asset_model_01/model/imbalance/models>
> ;:
> >> com.fasterxml.jackson.databind.exc.MismatchedInputException:
> Cannot
> >> deserialize instance of `java.util.ArrayList` out of
> START_OBJECT t

Re: Change Flink binding address in local mode

2019-12-09 Thread Andrea Cardaci
On Mon, 9 Dec 2019 at 12:54, Chesnay Schepler  wrote:
> At this point I would suggest to file a ticket

Here it is: https://issues.apache.org/jira/browse/FLINK-15154


Re: Change Flink binding address in local mode

2019-12-09 Thread Chesnay Schepler
At this point I would suggest to file a ticket; these are the options 
that _should_ control the behavior but apparently aren't in all cases.


On 08/12/2019 12:23, Andrea Cardaci wrote:

Hi,

Flink (or some of its services) listens on three random TCP ports
during the local[1] execution, e.g., 39951, 41009 and 42849.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/local_execution.html#local-environment

The sockets listens on `0.0.0.0` and since I need to run some
long-running tests on an Internet-facing machine I was wondering how
to make them listen on `localhost` instead or if there is anything
else I can do to improve the security in this scenario.

Here's what I tried (with little luck):


Configuration config = new Configuration();
config.setString("taskmanager.host", "127.0.0.1");
config.setString("rest.bind-address", "127.0.0.1"); // OK
config.setString("jobmanager.rpc.address", "127.0.0.1");
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.getDefaultLocalParallelism(),
 config);

Only the `rest.bind-address` configuration actually changes the
binding address of one of those ports. Are there other parameters that
I'm not aware of or this is not the right approach in local mode?


Best,
Andrea





Re: KeyBy/Rebalance overhead?

2019-12-09 Thread Arvid Heise
Hi Komal,

as a general rule of thumb, you want to avoid network shuffles as much as
possible. As vino pointed out, you need to reshuffle, if you need to group
by key. Another frequent usecase is for a rebalancing of data in case of a
heavy skew. Since neither applies to you, removing the keyby is the best
option.

If you want to retain it, because you may experience skew in the future,
there are only a couple of things you can do. You may tinker with
networking settings to have smaller/larger network buffers (smaller = less
latency, larger = more throughput) [1]. Of course, you get better results
if you have a faster network (running in the cloud, you can play around
with different adapters). Also you could try if less/more machines are
actually faster (less machines = less network traffic, more machines = more
compute power).

In any case, your data volume is so low that I would probably not optimize
too much. We are talking about seconds and the times may vary largely from
run to run, because of the low data volume. If you want to test the
throughput as a POC for a larger volume, I'd either generate a larger
sample or replicate it to get more reliable numbers. In any case, try to
have your final use case in mind when deciding for an option.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#configuring-the-network-buffers

On Mon, Dec 9, 2019 at 10:25 AM vino yang  wrote:

> Hi Komal,
>
> Actually, the main factor about choosing the type of the partition depends
> on your business logic. If you want to do some aggregation logic based on a
> group. You must choose KeyBy to guarantee the correctness semantics.
>
> Best,
> Vino
>
> Komal Mariam  于2019年12月9日周一 下午5:07写道:
>
>> Thank you @vino yang   for the reply. I suspect
>> keyBy will beneficial in those cases where my subsequent operators are
>> computationally intensive. Their computation time being > than network
>> reshuffling cost.
>>
>> Regards,
>> Komal
>>
>> On Mon, 9 Dec 2019 at 15:23, vino yang  wrote:
>>
>>> Hi Komal,
>>>
>>> KeyBy(Hash Partition, logically partition) and rebalance(physical
>>> partition) are both one of the partitions been supported by Flink.[1]
>>>
>>> Generally speaking, partitioning may cause network communication(network
>>> shuffles) costs which may cause more time cost. The example provided by you
>>> may be benefit from operator chain[2] if you remove the keyBy operation.
>>>
>>> Best,
>>> Vino
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>>>
>>> Komal Mariam  于2019年12月9日周一 上午9:11写道:
>>>
 Anyone?

 On Fri, 6 Dec 2019 at 19:07, Komal Mariam 
 wrote:

> Hello everyone,
>
> I want to get some insights on the KeyBy (and Rebalance) operations as
> according to my understanding they partition our tasks over the defined
> parallelism and thus should make our pipeline faster.
>
> I am reading a topic which contains 170,000,000 pre-stored records
> with 11 Kafka partitions and replication factor of 1.   Hence I use
> .setStartFromEarliest() to read the stream.
> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
> and 1 job manager with 6 cores. (10 task slots per TM hence I set
> environment parallelism to 30).
>
> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
> keeping the number of records fixed to get a handle on how fast they're
> being processed.
>
> When I remove keyBy, I get the same results in 39 secs as opposed to
> 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
> below I still get the same extra overhead of 9 to 13secs. My data is 
> mostly
> uniformly distributed on it's key so I can rule out skew.  Rebalance
> likewise has the same latency as keyBy.
>
>  What I want to know is what may be causing this overhead? And is
> there any way to decrease it?
>
> Here's the script I'm running for testing purposes:
> --
> DataStream JSONStream  = env.addSource(new
> FlinkKafkaConsumer<>("data", new
> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>
> DataStream myPoints = JSONStream.map(new jsonToPoint());
>
> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>
> public class findDistancefromPOI extends RichFilterFunction {
> public boolean filter(Point input) throws Exception {
> Double distance = computeEuclideanDist(
> 16.4199  , 89.974  ,input.X(),input.Y);
>  return distance > 0;
> }
> }
>
> Best Regards,
> Komal
>



Job manager is failing to start with an S3 no key specified exception [1.7.2]

2019-12-09 Thread Kumar Bolar, Harshith
Hi all,

I'm running a standalone Flink cluster with Zookeeper and S3 for high 
availability storage. All of a sudden, the job managers started failing with an 
S3 `UnrecoverableS3OperationException` error. Here is the full error trace -

```
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Could not set up 
JobManager
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set 
up JobManager
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$UnrecoverableS3OperationException:
 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error 
Code: NoSuchKey; Request ID: 1769066EBD605AB5; S3 Extended Request ID: 
K8jjbsE4DPAsZJDVJKBq3Nh0E0o+feafefavbvbaae+nbUTphHHw73/eafafefa+dsVMR0=), S3 
Extended Request ID: 
lklalkioe+eae2234+nbUTphHHw73/gVSclc1o1YH7M0MeNjmXl+dsVMR0= (Path: 
s3://abc-staging/flink/jobmanagerha/flink-2/blob/job_3e16166a1122885eb6e9b2437929b266/blob_p-3b687174148e9e1dd951f2a9fbec83f4fcd5281e-b85417f69b354c83b270bf01dcf389e0)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:908)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:893)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:878)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:871)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:810)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
at 
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:809)
... 10 more
Caused by: 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error 
Code: NoSuchKey; Request ID: 1769066EBaD6aefB5; S3 Extended Request ID: 
fealloga+4rVwsF+nbUTphHHw73/gVSclc1o1YH7M0MeNjmXl+dsVMR0=), S3 Extended Request 
ID: K8jjbsE4DPAsZJDVJKBq3Nh0E0o+4rVwsF+nbUTphHHweafga/lc1o1YH7M0MeNjmXl+dsVMR0=
at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
... 30 more
```

I could fix this by changing the `high-availability.cluster-id` property (which 
is currently set to `flink-2`) but with that I would lose all the existing jobs 
and state. Is there any way I can tell Flink to ignore this particular key in 
S3 and start the job managers?

Thanks,
Harshith


Re: KeyBy/Rebalance overhead?

2019-12-09 Thread vino yang
Hi Komal,

Actually, the main factor about choosing the type of the partition depends
on your business logic. If you want to do some aggregation logic based on a
group. You must choose KeyBy to guarantee the correctness semantics.

Best,
Vino

Komal Mariam  于2019年12月9日周一 下午5:07写道:

> Thank you @vino yang   for the reply. I suspect
> keyBy will beneficial in those cases where my subsequent operators are
> computationally intensive. Their computation time being > than network
> reshuffling cost.
>
> Regards,
> Komal
>
> On Mon, 9 Dec 2019 at 15:23, vino yang  wrote:
>
>> Hi Komal,
>>
>> KeyBy(Hash Partition, logically partition) and rebalance(physical
>> partition) are both one of the partitions been supported by Flink.[1]
>>
>> Generally speaking, partitioning may cause network communication(network
>> shuffles) costs which may cause more time cost. The example provided by you
>> may be benefit from operator chain[2] if you remove the keyBy operation.
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>>
>> Komal Mariam  于2019年12月9日周一 上午9:11写道:
>>
>>> Anyone?
>>>
>>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam 
>>> wrote:
>>>
 Hello everyone,

 I want to get some insights on the KeyBy (and Rebalance) operations as
 according to my understanding they partition our tasks over the defined
 parallelism and thus should make our pipeline faster.

 I am reading a topic which contains 170,000,000 pre-stored records with
 11 Kafka partitions and replication factor of 1.   Hence I use
 .setStartFromEarliest() to read the stream.
 My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
 and 1 job manager with 6 cores. (10 task slots per TM hence I set
 environment parallelism to 30).

 There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
 keeping the number of records fixed to get a handle on how fast they're
 being processed.

 When I remove keyBy, I get the same results in 39 secs as opposed to 52
 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
 below I still get the same extra overhead of 9 to 13secs. My data is mostly
 uniformly distributed on it's key so I can rule out skew.  Rebalance
 likewise has the same latency as keyBy.

  What I want to know is what may be causing this overhead? And is there
 any way to decrease it?

 Here's the script I'm running for testing purposes:
 --
 DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
 new
 JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())

 DataStream myPoints = JSONStream.map(new jsonToPoint());

 mypoints.keyBy("oID").filter(new findDistancefromPOI());

 public class findDistancefromPOI extends RichFilterFunction {
 public boolean filter(Point input) throws Exception {
 Double distance = computeEuclideanDist(
 16.4199  , 89.974  ,input.X(),input.Y);
  return distance > 0;
 }
 }

 Best Regards,
 Komal

>>>


Re: Sample Code for querying Flink's default metrics

2019-12-09 Thread vino yang
Hi Pankaj,

> Is there any sample code for how to read such default metrics?  Is there
any way to query the default metrics, such as CPU usage and Memory, without
using REST API or Reporters?

What's your real requirement? Can you use code to call REST API?  Why does
it not match your requirements?

> Additionally, how do I query Backpressure using code, or is it still only
visually available via the dashboard UI? Consequently, is there any way to
infer Backpressure by querying one (or more) of the Memory metrics of the
TaskManager?

The backpressure is related to not only memory metrics but also IO and
network metrics, for more details about measure backpressure please see
this blog.[1][2]

[1]: https://flink.apache.org/2019/06/05/flink-network-stack.html
[2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html

Best,
Vino

Pankaj Chand  于2019年12月9日周一 下午12:07写道:

> Hello,
>
> Using Flink on Yarn, I could not understand the documentation for how to
> read the default metrics via code. In particular, I want to read
> throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and
> Memory.
>
> Is there any sample code for how to read such default metrics?  Is there
> any way to query the default metrics, such as CPU usage and Memory, without
> using REST API or Reporters?
>
> Additionally, how do I query Backpressure using code, or is it still only
> visually available via the dashboard UI? Consequently, is there any way to
> infer Backpressure by querying one (or more) of the Memory metrics of the
> TaskManager?
>
> Thank you,
>
> Pankaj
>


Re: KeyBy/Rebalance overhead?

2019-12-09 Thread Komal Mariam
Thank you @vino yang   for the reply. I suspect
keyBy will beneficial in those cases where my subsequent operators are
computationally intensive. Their computation time being > than network
reshuffling cost.

Regards,
Komal

On Mon, 9 Dec 2019 at 15:23, vino yang  wrote:

> Hi Komal,
>
> KeyBy(Hash Partition, logically partition) and rebalance(physical
> partition) are both one of the partitions been supported by Flink.[1]
>
> Generally speaking, partitioning may cause network communication(network
> shuffles) costs which may cause more time cost. The example provided by you
> may be benefit from operator chain[2] if you remove the keyBy operation.
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>
> Komal Mariam  于2019年12月9日周一 上午9:11写道:
>
>> Anyone?
>>
>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam  wrote:
>>
>>> Hello everyone,
>>>
>>> I want to get some insights on the KeyBy (and Rebalance) operations as
>>> according to my understanding they partition our tasks over the defined
>>> parallelism and thus should make our pipeline faster.
>>>
>>> I am reading a topic which contains 170,000,000 pre-stored records with
>>> 11 Kafka partitions and replication factor of 1.   Hence I use
>>> .setStartFromEarliest() to read the stream.
>>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
>>> and 1 job manager with 6 cores. (10 task slots per TM hence I set
>>> environment parallelism to 30).
>>>
>>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>>> keeping the number of records fixed to get a handle on how fast they're
>>> being processed.
>>>
>>> When I remove keyBy, I get the same results in 39 secs as opposed to 52
>>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
>>> below I still get the same extra overhead of 9 to 13secs. My data is mostly
>>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>>> likewise has the same latency as keyBy.
>>>
>>>  What I want to know is what may be causing this overhead? And is there
>>> any way to decrease it?
>>>
>>> Here's the script I'm running for testing purposes:
>>> --
>>> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
>>> new
>>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>>
>>> DataStream myPoints = JSONStream.map(new jsonToPoint());
>>>
>>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>>
>>> public class findDistancefromPOI extends RichFilterFunction {
>>> public boolean filter(Point input) throws Exception {
>>> Double distance = computeEuclideanDist(
>>> 16.4199  , 89.974  ,input.X(),input.Y);
>>>  return distance > 0;
>>> }
>>> }
>>>
>>> Best Regards,
>>> Komal
>>>
>>