Re: Filter columns of a csv file with Flink

2018-07-06 Thread Hequn Cheng
Hi francois,

> I see that CsvTableSource allows to define csv fields. Then, will it
check if columns actually exists in the file and throw Exception if not ?
Currently, CsvTableSource doesn't support Avro. CsvTableSource
uses fieldDelim and rowDelim to parse data. But there is a workaround: read
each line from data as a single big column, i.e., the source table only has
one column. Afterward, you can use udtf[1] to split each line. You can
throw away data or throw exceptions in udtf as you wish.

>  I want to check if files structure is right before processing them.
If you want to skip the whole file when the schema is erroneous. You can
write a user defined table source and probably have to write a user defined
InputFormat. You can refer to the AvroInputFormat[2] as an example.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions

[2]
https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java

On Fri, Jul 6, 2018 at 11:32 PM, françois lacombe <
francois.laco...@dcbrain.com> wrote:

> Hi Hequn,
>
> The Table-API is really great.
> I will use and certainly love it to solve the issues I mentioned before
>
> One subsequent question regarding Table-API :
> I've got my csv files and avro schemas that describe them.
> As my users can send erroneous files, inconsistent with schemas, I want to
> check if files structure is right before processing them.
> I see that CsvTableSource allows to define csv fields. Then, will it check
> if columns actually exists in the file and throw Exception if not ?
>
> Or is there any other way in Apache Avro to check if a csv file is
> consistent with a given schema?
>
> Big thank to put on the table-api's way :)
>
> Best R
>
> François Lacombe
>
>
>
> 2018-07-06 16:53 GMT+02:00 Hequn Cheng :
>
>> Hi francois,
>>
>> If I understand correctly, you can use sql or table-api to solve you
>> problem.
>> As you want to project part of columns from source, a columnar storage
>> like parquet/orc would be efficient. Currently, ORC table source is
>> supported in flink, you can find more details here[1]. Also, there are many
>> other table sources[2] you can choose. With a TableSource, you can read the
>> data and register it as a Table and do table operations through sql[3] or
>> table-api[4].
>>
>> To make a json string from several columns, you can write a user defined
>> function[5].
>>
>> I also find a OrcTableSourceITCase[6] which I think may be helpful for
>> you.
>>
>> Best, Hequn
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/sourceSinks.html#orctablesource
>> [2] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/sourceSinks.html#table-sources-sinks
>> [3] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/sql.html
>> [4] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/tableApi.html
>> [5] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/udfs.html
>> [6] https://github.com/apache/flink/blob/master/flink-connec
>> tors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableS
>> ourceITCase.java
>>
>>
>> On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <
>> francois.laco...@dcbrain.com> wrote:
>>
>>> Hi all,
>>>
>>> I'm a new user to Flink community. This tool sounds great to achieve
>>> some data loading of millions-rows files into a pgsql db for a new project.
>>>
>>> As I read docs and examples, a proper use case of csv loading into pgsql
>>> can't be found.
>>> The file I want to load isn't following the same structure than the
>>> table, I have to delete some columns and make a json string from several
>>> others too prior to load to pgsql
>>>
>>> I plan to use Flink 1.5 Java API and a batch process.
>>> Does the DataSet class is able to strip some columns out of the records
>>> I load or should I iterate over each record to delete the columns?
>>>
>>> Same question to make a json string from several columns of the same
>>> record?
>>> E.g json_column =3D {"field1":col1, "field2":col2...}
>>>
>>> I work with 20 millions length files and it sounds pretty ineffective to
>>> iterate over each records.
>>> Can someone tell me if it's possible or if I have to change my mind
>>> about this?
>>>
>>>
>>> Thanks in advance, all the best
>>>
>>> François Lacombe
>>>
>>>
>>
>


Re: Slide Window Compute Optimization

2018-07-06 Thread Rong Rong
+1. Yes your use case would probably fit best in the OVER aggregate use
case.

I actually created for myself a complimentary note

for some of the complex aggregate components on top of Flink SQL/Table API
official doc[1]. If this could help you better understanding how the OVER
aggregate method could fit into your use case. Let me know if it is helpful
:-)

@Fabian, if possible, please share some comments on the note when you have
time. :-)

Thanks,
Rong

On Fri, Jul 6, 2018 at 2:30 AM Fabian Hueske  wrote:

> Hi Yennie,
>
> You might want to have a look at the OVER windows of Flink's Table API or
> SQL [1].
>
> An OVER window computes an aggregate (such as a count) for each incoming
> record over a range of previous events.
> For example the query:
>
> SELECT ip, successful, COUNT(*) OVER (PARTITION BY ip, successful ORDER BY
> loginTime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
>   FROM logins;
>
> computes for each login attempt the number of login attempts of the
> previous hour.
>
> There is no corresponding built-in operator in the DataStream API but SQL
> and Table API queries can be very easily integrated with DataStream
> programs [2].
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html#aggregations
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api
>
> 2018-07-06 8:01 GMT+02:00 YennieChen88 :
>
>> Hi Kostas and Rong,
>> Thank you for your reply.
>> As both of you ask for more info about my use case, I now reply in
>> unison.
>> My case is used for counting the number of successful login and
>> failures
>> within one hour, keyBy other login related attributes (e.g. ip, device,
>> login type ...). According to the count result of the previous hour, to
>> judge whether the next login is compliant.
>> We have a high requirement for the flink compute time, to reduce the
>> impact on user login. From receiving source to sinking results into
>> database, only about 10ms time is acceptable. Base on this, we expect the
>> compute result as accurate as possible. The best case without error is the
>> latest sink time after 1-hour compute exactly the same as the user login
>> time which need judge compliance. Is that means the smaller the step size
>> of
>> slide window, the more accurate the results? But Now it seems that the
>> smaller step size of slide window,the longer time need to compute. Because
>> once a element arrives, it will be processed in every window (number of
>> windows = window size/step size)serially through one thread.
>>
>> Rong Rong wrote
>> > Hi Yennie,
>> >
>> > AFAIK, the sliding window will in fact duplicate elements into multiple
>> > different streams. There's a discussion thread regarding this [1].
>> > We are looking into some performance improvement, can you provide some
>> > more
>> > info regarding your use case?
>> >
>> > --
>> > Rong
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-7001
>> >
>> > On Thu, Jul 5, 2018 at 3:30 AM Kostas Kloudas 
>>
>> > k.kloudas@
>>
>> > 
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> You are correct that with sliding windows you will have 3600 “open
>> >> windows” at any point.
>> >> Could you describe a bit more what you want to do?
>> >>
>> >> If you simply want to have an update of something like a counter every
>> >> second, then you can
>> >> implement your own logic with a ProcessFunction that allows to handle
>> >> state and timers in a
>> >> custom way (see [1]).
>> >>
>> >> Hope this helps,
>> >> Kostas
>> >>
>> >> [1]
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html
>> >>
>> >>
>> >> On Jul 5, 2018, at 12:12 PM, YennieChen88 
>>
>> > chenyanying3@
>>
>> >  wrote:
>> >>
>> >> Hi,
>> >>I want to use slide windows of 1 hour window size and 1 second step
>> >> size. I found that once a element arrives, it will be processed in 3600
>> >> windows serially through one thread. It takes serveral seconds to
>> finish
>> >> one
>> >> element processing,much more than my expection. Do I have any way to
>> >> optimizate it?
>> >>Thank you very much for your reply.
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from:
>> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> >>
>> >>
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>


Re: Description of Flink event time processing

2018-07-06 Thread Elias Levy
Apologies.  Comments are now enabled.

On Thu, Jul 5, 2018 at 6:09 PM Rong Rong  wrote:

> Hi Elias,
>
> Thanks for putting together the document. This is actually a very good,
> well-rounded document.
> I think you did not to enable access for comments for the link. Would you
> mind enabling comments for the google doc?
>
> Thanks,
> Rong
>
>
> On Thu, Jul 5, 2018 at 8:39 AM Fabian Hueske  wrote:
>
>> Hi Elias,
>>
>> Thanks for the great document!
>> I made a pass over it and left a few comments.
>>
>> I think we should definitely add this to the documentation.
>>
>> Thanks,
>> Fabian
>>
>> 2018-07-04 10:30 GMT+02:00 Fabian Hueske :
>>
>>> Hi Elias,
>>>
>>> I agree, the docs lack a coherent discussion of event time features.
>>> Thank you for this write up!
>>> I just skimmed your document and will provide more detailed feedback
>>> later.
>>>
>>> It would be great to add such a page to the documentation.
>>>
>>> Best, Fabian
>>>
>>> 2018-07-03 3:07 GMT+02:00 Elias Levy :
>>>
 The documentation of how Flink handles event time and watermarks is
 spread across several places.  I've been wanting a single location that
 summarizes the subject, and as none was available, I wrote one up.

 You can find it here:
 https://docs.google.com/document/d/1b5d-hTdJQsPH3YD0zTB4ZqodinZVHFomKvt41FfUPMc/edit?usp=sharing

 I'd appreciate feedback, particularly about the correctness of the
 described behavior.

>>>
>>>
>>


StateMigrationException when switching from TypeInformation.of to createTypeInformation

2018-07-06 Thread Elias Levy
During some refactoring we changed a job using managed state from:

ListStateDescriptor("config", TypeInformation.of(new
TypeHint[ConfigState]() {}))

to

ListStateDescriptor("config", createTypeInformation[ConfigState])

After this change, Flink refused to start the new job from a savepoint or
checkpoint, raising StateMigrationException instead.

Why is Flink raising this error?  Both TypeInformation.of and
createTypeInformation return TypeInformation[ConfigState], so why does it
think the state type has changed?


Re: flink1.5 web UI

2018-07-06 Thread Vishal Santoshi
It seems it is the UI refresh that forces  the loop on the job server. From
flink cli  it does it once.. So this might be a false alarm.

On Fri, Jul 6, 2018 at 4:55 PM, Vishal Santoshi 
wrote:

> The UI shows the following and the JM goes into a convulsions trying to
> retrieve a jobiid as above.
>
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>
>
> On Fri, Jul 6, 2018 at 4:53 PM, Vishal Santoshi  > wrote:
>
>> If we submit a job through CLI and it has an error ( missing args and so
>> on ) , the JM goes into convulsions. It seems it submits a job without fist
>> validating and then goes into a loop trying to figure out the job
>>
>> Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>> orkerThread.java:107)
>>
>> Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: Caused by:
>> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not
>> find Flink job (0de3cb47d32dd25ba0375d97bfd07387)
>>
>> Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
>> org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterG
>> atewayFuture(Dispatcher.java:693)
>>
>> Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
>> org.apache.flink.runtime.dispatcher.Dispatcher.requestJob(
>> Dispatcher.java:459)
>>
>> Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
>> sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
>>
>> Is this known ?
>>
>
>


Re: flink1.5 web UI

2018-07-06 Thread Vishal Santoshi
The UI shows the following and the JM goes into a convulsions trying to
retrieve a jobiid as above.

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error.


On Fri, Jul 6, 2018 at 4:53 PM, Vishal Santoshi 
wrote:

> If we submit a job through CLI and it has an error ( missing args and so
> on ) , the JM goes into convulsions. It seems it submits a job without fist
> validating and then goes into a loop trying to figure out the job
>
> Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: Caused by:
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not
> find Flink job (0de3cb47d32dd25ba0375d97bfd07387)
>
> Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
> org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGatewayFuture(
> Dispatcher.java:693)
>
> Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
> org.apache.flink.runtime.dispatcher.Dispatcher.requestJob(Dispatcher.java:
> 459)
>
> Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
> sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
>
> Is this known ?
>


flink1.5 web UI

2018-07-06 Thread Vishal Santoshi
If we submit a job through CLI and it has an error ( missing args and so on
) , the JM goes into convulsions. It seems it submits a job without fist
validating and then goes into a loop trying to figure out the job

Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: Caused by:
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
Flink job (0de3cb47d32dd25ba0375d97bfd07387)

Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGatewayFuture(Dispatcher.java:693)

Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
org.apache.flink.runtime.dispatcher.Dispatcher.requestJob(Dispatcher.java:459)

Jul 06 16:51:26 flink-9edd15d7.bf2.tumblr.net docker[31171]: at
sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)

Is this known ?


Re: Filter columns of a csv file with Flink

2018-07-06 Thread françois lacombe
Hi Hequn,

The Table-API is really great.
I will use and certainly love it to solve the issues I mentioned before

One subsequent question regarding Table-API :
I've got my csv files and avro schemas that describe them.
As my users can send erroneous files, inconsistent with schemas, I want to
check if files structure is right before processing them.
I see that CsvTableSource allows to define csv fields. Then, will it check
if columns actually exists in the file and throw Exception if not ?

Or is there any other way in Apache Avro to check if a csv file is
consistent with a given schema?

Big thank to put on the table-api's way :)

Best R

François Lacombe



2018-07-06 16:53 GMT+02:00 Hequn Cheng :

> Hi francois,
>
> If I understand correctly, you can use sql or table-api to solve you
> problem.
> As you want to project part of columns from source, a columnar storage
> like parquet/orc would be efficient. Currently, ORC table source is
> supported in flink, you can find more details here[1]. Also, there are many
> other table sources[2] you can choose. With a TableSource, you can read the
> data and register it as a Table and do table operations through sql[3] or
> table-api[4].
>
> To make a json string from several columns, you can write a user defined
> function[5].
>
> I also find a OrcTableSourceITCase[6] which I think may be helpful for
> you.
>
> Best, Hequn
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sourceSinks.html#orctablesource
> [2] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sourceSinks.html#table-sources-sinks
> [3] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sql.html
> [4] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/tableApi.html
> [5] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/udfs.html
> [6] https://github.com/apache/flink/blob/master/flink-
> connectors/flink-orc/src/test/java/org/apache/flink/orc/
> OrcTableSourceITCase.java
>
>
> On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi all,
>>
>> I'm a new user to Flink community. This tool sounds great to achieve some
>> data loading of millions-rows files into a pgsql db for a new project.
>>
>> As I read docs and examples, a proper use case of csv loading into pgsql
>> can't be found.
>> The file I want to load isn't following the same structure than the
>> table, I have to delete some columns and make a json string from several
>> others too prior to load to pgsql
>>
>> I plan to use Flink 1.5 Java API and a batch process.
>> Does the DataSet class is able to strip some columns out of the records I
>> load or should I iterate over each record to delete the columns?
>>
>> Same question to make a json string from several columns of the same
>> record?
>> E.g json_column =3D {"field1":col1, "field2":col2...}
>>
>> I work with 20 millions length files and it sounds pretty ineffective to
>> iterate over each records.
>> Can someone tell me if it's possible or if I have to change my mind about
>> this?
>>
>>
>> Thanks in advance, all the best
>>
>> François Lacombe
>>
>>
>


Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

2018-07-06 Thread Rinat
Hi Mingey !

I’ve implemented the group of tests, that shows that problem exists only when 
part suffix is specified and file in pending state exists

here is an exception

testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest)
  Time elapsed: 0.018 sec  <<< ERROR!
java.io.IOException: File already exists: 
/var/folders/v9/r7ybtp9n4lj_6ybx5xnngyzmgn/T/junit8543902037302786417/junit2291904425846970077/part-0-0.my.in-progress
at 
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:259)
at 
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:71)
at 
org.apache.flink.streaming.connectors.fs.StringWriter.open(StringWriter.java:69)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:587)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:111)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncremented(BucketingSinkTest.java:970)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(BucketingSinkTest.java:909)


You could add the following test to the 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.class

@Test//(expected = IOException.class)
   public void 
testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState()
  throws Exception {
  testThatPartIndexIsIncremented(".my", "part-0-0.my" + IN_PROGRESS_SUFFIX);
   }

   private void testThatPartIndexIsIncremented(String partSuffix, String 
existingPartFile) throws Exception {
  File outDir = tempFolder.newFolder();
  long inactivityInterval = 100;

  java.nio.file.Path bucket = Paths.get(outDir.getPath());
  Files.createFile(bucket.resolve(existingPartFile));

  String basePath = outDir.getAbsolutePath();
  BucketingSink sink = new BucketingSink(basePath)
 .setBucketer(new BasePathBucketer<>())
 .setInactiveBucketCheckInterval(inactivityInterval)
 .setInactiveBucketThreshold(inactivityInterval)
 .setPartPrefix(PART_PREFIX)
 .setInProgressPrefix("")
 .setPendingPrefix("")
 .setValidLengthPrefix("")
 .setInProgressSuffix(IN_PROGRESS_SUFFIX)
 .setPendingSuffix(PENDING_SUFFIX)
 .setValidLengthSuffix(VALID_LENGTH_SUFFIX)
 .setPartSuffix(partSuffix)
 .setBatchSize(0);

  OneInputStreamOperatorTestHarness testHarness = 
createTestSink(sink, 1, 0);
  testHarness.setup();
  testHarness.open();

  testHarness.setProcessingTime(0L);

  testHarness.processElement(new StreamRecord<>("test1", 1L));

  testHarness.setProcessingTime(101L);
  testHarness.snapshot(0, 0);
  testHarness.notifyOfCompletedCheckpoint(0);
  sink.close();

  String expectedFileName = partSuffix == null ? "part-0-1" : "part-0-1" + 
partSuffix;
//assertThat(Files.exists(bucket.resolve(expectedFileName)), is(true));
   }

And check, that test fails

it’s actual for the current master branch, also I’ve implemented a PR, that 
fixes this problem (https://github.com/apache/flink/pull/6176 
)

For some reasons, I still couldn’t compile the whole flink repository, to run 
your example locally from IDE, but from my point of view, problem exists, and 
the following test shows it’s existance, please, have a look

I’m working on flink project assembly on my local machine …

Thx


> On 25 Jun 2018, at 10:44, Rinat  wrote:
> 
> Hi Mingey !
> 
> Thx for your reply, really, have no idea why everything works in your case, I 
> have implemented unit tests in my PR which shows, that problem exists. 
> Please, let me know which Flink version do you use ?
> Current fix is actual for current master branch, here it an example of unit 
> test, that shows the problem
> 
> @Test
> public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecified() throws 
> Exception {
>String partSuffix = ".my";
> 
>File outDir = tempFolder.newFolder();
>long inactivityInterval = 100;
> 
>java.nio.file.Path bucket = Paths.get(outDir.getPath());

Re: Filter columns of a csv file with Flink

2018-07-06 Thread Hequn Cheng
Hi francois,

If I understand correctly, you can use sql or table-api to solve you
problem.
As you want to project part of columns from source, a columnar storage like
parquet/orc would be efficient. Currently, ORC table source is supported in
flink, you can find more details here[1]. Also, there are many other table
sources[2] you can choose. With a TableSource, you can read the data and
register it as a Table and do table operations through sql[3] or
table-api[4].

To make a json string from several columns, you can write a user defined
function[5].

I also find a OrcTableSourceITCase[6] which I think may be helpful for you.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#orctablesource
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#table-sources-sinks
[3]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html
[4]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html
[5]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html
[6]
https://github.com/apache/flink/blob/master/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java


On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <
francois.laco...@dcbrain.com> wrote:

> Hi all,
>
> I'm a new user to Flink community. This tool sounds great to achieve some
> data loading of millions-rows files into a pgsql db for a new project.
>
> As I read docs and examples, a proper use case of csv loading into pgsql
> can't be found.
> The file I want to load isn't following the same structure than the table,
> I have to delete some columns and make a json string from several others
> too prior to load to pgsql
>
> I plan to use Flink 1.5 Java API and a batch process.
> Does the DataSet class is able to strip some columns out of the records I
> load or should I iterate over each record to delete the columns?
>
> Same question to make a json string from several columns of the same
> record?
> E.g json_column =3D {"field1":col1, "field2":col2...}
>
> I work with 20 millions length files and it sounds pretty ineffective to
> iterate over each records.
> Can someone tell me if it's possible or if I have to change my mind about
> this?
>
>
> Thanks in advance, all the best
>
> François Lacombe
>
>


Re: Limiting in flight data

2018-07-06 Thread Vishal Santoshi
Further if there is are metrics that allows us to chart delays per pipe on
n/w buffers, that would be immensely helpful.

On Fri, Jul 6, 2018 at 10:02 AM, Vishal Santoshi 
wrote:

> Awesome, thank you for pointing that out. We have seen stability on pipes
> where previously throttling the source ( rateLimiter ) was the only way out.
>
> https://github.com/apache/flink/blob/master/flink-core/
> src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L291
>
> This though seems to be a cluster wide setting. Is it possible to do this
> at an operator level ?  Does this work with the pipe level configuration
> per job ( or has that been deprecated )
>
> On Thu, Jul 5, 2018 at 11:16 PM, Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com> wrote:
>
>> Hi Vishal,
>>
>> Before Flink-1.5.0, the sender tries best to send data on the network
>> until the wire is filled with data. From Flink-1.5.0
>> the network flow control is improved by credit-based idea. That mea
>> ns the sender transfers data based on how many buffers
>> avaiable on receiver side, so there will be no data accumulated on the wire. 
>> From
>> this point, the in-flighting data is less than before.
>>
>> Also you can further limit the in-flighting data by controling the number
>> of credits on receiver side, and the related parameters are
>> taskmanager.network.memory.buffers-per-channel and
>> taskmanager.network.memory.floating-buffers-per-gate.
>>
>> If you have other questions about them, let me know then i can explain
>> for you.
>>
>> Zhijiang
>>
>> --
>> 发件人:Vishal Santoshi 
>> 发送时间:2018年7月5日(星期四) 22:28
>> 收件人:user 
>> 主 题:Limiting in flight data
>>
>> "Yes, Flink 1.5.0 will come with better tools to handle this problem.
>> Namely you will be able to limit the “in flight” data, by controlling the
>> number of assigned credits per channel/input gate. Even without any
>> configuring Flink 1.5.0 will out of the box buffer less data, thus
>> mitigating the problem."
>>
>> I read this in another email chain. The docs ( may be you can point me to
>> them ) are not very clear on how to do the above. Any pointers will be
>> appreciated.
>>
>> Thanks much.
>>
>>
>>
>


Re: Limiting in flight data

2018-07-06 Thread Vishal Santoshi
Awesome, thank you for pointing that out. We have seen stability on pipes
where previously throttling the source ( rateLimiter ) was the only way out.

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L291

This though seems to be a cluster wide setting. Is it possible to do this
at an operator level ?  Does this work with the pipe level configuration
per job ( or has that been deprecated )

On Thu, Jul 5, 2018 at 11:16 PM, Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> Hi Vishal,
>
> Before Flink-1.5.0, the sender tries best to send data on the network
> until the wire is filled with data. From Flink-1.5.0
> the network flow control is improved by credit-based idea. That
> means the sender transfers data based on how many
> buffers avaiable on receiver side, so there will be no
> data accumulated on the wire. From this point, the in-flighting data is
> less than before.
>
> Also you can further limit the in-flighting data by controling the number
> of credits on receiver side, and the related parameters are
> taskmanager.network.memory.buffers-per-channel and
> taskmanager.network.memory.floating-buffers-per-gate.
>
> If you have other questions about them, let me know then i can explain for
> you.
>
> Zhijiang
>
> --
> 发件人:Vishal Santoshi 
> 发送时间:2018年7月5日(星期四) 22:28
> 收件人:user 
> 主 题:Limiting in flight data
>
> "Yes, Flink 1.5.0 will come with better tools to handle this problem.
> Namely you will be able to limit the “in flight” data, by controlling the
> number of assigned credits per channel/input gate. Even without any
> configuring Flink 1.5.0 will out of the box buffer less data, thus
> mitigating the problem."
>
> I read this in another email chain. The docs ( may be you can point me to
> them ) are not very clear on how to do the above. Any pointers will be
> appreciated.
>
> Thanks much.
>
>
>


flink rocksdb where to configure mount point

2018-07-06 Thread Siew Wai Yow
Hi,


We configure rocksdb as statebackend and checkpoint dir persists to hdfs. When 
the job is run, rocksdb automatically mount to tmpfs /tmp, which consume memory.


RocksDBStateBackend rocksdb = new RocksDBStateBackend(new 
FsStateBackend(hdfs://), true);
env.setStateBackend(rocksdb);


Questions,

  1.  What happen if large state is larger than the tmpfs memory size? Will 
flink do something?
  2.  Is it possible to configure rocksdb dir via flink?

Thank you.

Regards,
Yow


Filter columns of a csv file with Flink

2018-07-06 Thread françois lacombe
Hi all,

I'm a new user to Flink community. This tool sounds great to achieve some
data loading of millions-rows files into a pgsql db for a new project.

As I read docs and examples, a proper use case of csv loading into pgsql
can't be found.
The file I want to load isn't following the same structure than the table,
I have to delete some columns and make a json string from several others
too prior to load to pgsql

I plan to use Flink 1.5 Java API and a batch process.
Does the DataSet class is able to strip some columns out of the records I
load or should I iterate over each record to delete the columns?

Same question to make a json string from several columns of the same record?
E.g json_column =3D {"field1":col1, "field2":col2...}

I work with 20 millions length files and it sounds pretty ineffective to
iterate over each records.
Can someone tell me if it's possible or if I have to change my mind about
this?


Thanks in advance, all the best

François Lacombe


Re: is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?

2018-07-06 Thread Vishal Santoshi
Yep, pwrfect, that we do.  Can you confirm though that jobs will restart in
the case of a failover ? That is what we see and that is fine..

On Fri, Jul 6, 2018, 8:24 AM Chesnay Schepler  wrote:

> If i remember correctly the masters file is only used by the
> [start|stop]-cluster.sh scripts to determine how many JobManagers should be
> started / stopped and which port they should use.
>
> it's not necessarily *required*, but without it you have to manually
> start/stop all jobmanagers.
>
> On 06.07.2018 14:08, Vishal Santoshi wrote:
>
> Hello Chesnay, I have used an HA setup without the masters file and have
> seen failover happen based on alerts from a leader election routine Is
> it actually required that there be a masters file when there is a central
> arbiterer ZK  that has the alive JMs and a call back to force TMs to switch
> to a new leader in case of failure...
>
> On Tue, Jun 5, 2018, 6:45 AM Chesnay Schepler  wrote:
>
>> Please look into high-availability
>> 
>> to make your cluster resistant against shutdowns.
>>
>> On 05.06.2018 12:31, makeyang wrote:
>>
>> can anybody share anythoughts, insights about this issue?
>>
>>
>>
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>


Re: is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?

2018-07-06 Thread Chesnay Schepler
If i remember correctly the masters file is only used by the 
[start|stop]-cluster.sh scripts to determine how many JobManagers should 
be started / stopped and which port they should use.


it's not necessarily /required/, but without it you have to manually 
start/stop all jobmanagers.


On 06.07.2018 14:08, Vishal Santoshi wrote:
Hello Chesnay, I have used an HA setup without the masters file and 
have seen failover happen based on alerts from a leader election 
routine Is it actually required that there be a masters file when 
there is a central arbiterer ZK  that has the alive JMs and a call 
back to force TMs to switch to a new leader in case of failure...


On Tue, Jun 5, 2018, 6:45 AM Chesnay Schepler > wrote:


Please look into high-availability


to make your cluster resistant against shutdowns.

On 05.06.2018 12:31, makeyang wrote:

can anybody share anythoughts, insights about this issue?



--
Sent 
from:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/







Re: is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?

2018-07-06 Thread Vishal Santoshi
Even though I must admit that the jobs restart but they do restart
successfully  with the new JM.

On Fri, Jul 6, 2018, 8:08 AM Vishal Santoshi 
wrote:

> Hello Chesnay, I have used an HA setup without the masters file and have
> seen failover happen based on alerts from a leader election routine Is
> it actually required that there be a masters file when there is a central
> arbiterer ZK  that has the alive JMs and a call back to force TMs to switch
> to a new leader in case of failure...
>
> On Tue, Jun 5, 2018, 6:45 AM Chesnay Schepler  wrote:
>
>> Please look into high-availability
>> 
>> to make your cluster resistant against shutdowns.
>>
>> On 05.06.2018 12:31, makeyang wrote:
>>
>> can anybody share anythoughts, insights about this issue?
>>
>>
>>
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>


Re: is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?

2018-07-06 Thread Vishal Santoshi
Hello Chesnay, I have used an HA setup without the masters file and have
seen failover happen based on alerts from a leader election routine Is
it actually required that there be a masters file when there is a central
arbiterer ZK  that has the alive JMs and a call back to force TMs to switch
to a new leader in case of failure...

On Tue, Jun 5, 2018, 6:45 AM Chesnay Schepler  wrote:

> Please look into high-availability
> 
> to make your cluster resistant against shutdowns.
>
> On 05.06.2018 12:31, makeyang wrote:
>
> can anybody share anythoughts, insights about this issue?
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: A use-case for Flink and reactive systems

2018-07-06 Thread Fabian Hueske
Hi Yersinia,

let me reply to some of your questions. I think these answers should also
address most of Mich's questions as well.

> What you are saying is I can either use Flink and forget database layer,
or make a java microservice with a database. Mixing Flink with a Database
doesn't make any sense.
> My concerns with the database is how do you work out the previous state
to calculate the new one? Is it good and fast? (moving money from account A
to B isn't a problem cause you have two separate events).

Well, you could implement a Flink application that reads and writes account
data from/to an external database but that voids the advantages of Flink's
state management.
A stateful Flink application can maintain the accounts as state (in memory
or in RocksDB on disk) and just push updates to an external DB from which
external services read. As you said, you can also use queryable state to
avoid using an external DB, but that means the state is only accessible
when the Flink application is running.

> This simple CEP example requires you to mine previous data/states from a
DB, right? Can Flink be considered for it without an external DB but only
relying on its internal RockDB ?

The CEP library also keeps all data to evaluate a pattern in local state.
Again, this can be in-memory JVM or RocksDB but not an external DB.

> DB size is curious, what about if I plan to use it in a true big
environment? We used a financial system, so if I am BNP Paribas, HSBC or
VISA and I want to process incoming transactions to update the balance.
Flink is into receiving transactions and updating the balance (state).
However, I have 100 millions of accounts, so even scaling Flink I might
have some storage limit.

Flink maintains its state in a so-called state backend. We have state
backends that store all data in the JVM heap or in a local, embedded
RocksDB instance on disk.
Also state is typically partitioned on a key and can be spread over many
nodes.
We have users with applications that maintain several terabytes of state
(10+ TB).

> I got stuck understanding that Flink don't work with the DB, except for
RockDB which it can be implemented internally. Hence, unless I redefine my
cases (which I aim to do), Flink isn't the best choice here.

Flink works best if it can manage all required data in its own state and
locally read and update it.
Flink manages state in JVM memory or an embedded RocksDB instance (embedded
means you don't have to set up anything. All of that happens automatically).
You can store and access the state in an external DB, but that diminishes
Flink's advantages of local state management. There is an AsyncIO operator
for such operations.

Pushing the result of an application (immediately or in intervals) to an
external data store like Kafka, JDBC, Cassandra or any other database, is
fine.
However, when writing to an external DB, you should think about the
consistency guarantees that you need.

Best, Fabian


2018-07-05 18:30 GMT+02:00 Yersinia Ruckeri :

> I guess my initial bad explanation caused confusion.
> After reading again docs I got your points. I can use Flink for online
> streaming processing, letting it to manage the state, which can be
> persisted in a DB asynchronously to ensure savepoints and using queryable
> state to make the current state available for queries (I know this API can
> change, but let's assume it's ok for now).
> DB size is curious, what about if I plan to use it in a true big
> environment? We used a financial system, so if I am BNP Paribas, HSBC or
> VISA and I want to process incoming transactions to update the balance.
> Flink is into receiving transactions and updating the balance (state).
> However, I have 100 millions of accounts, so even scaling Flink I might
> have some storage limit.
>
> My research around Flink was for investigating two cases:
> 1. see if and how Flink can be put into an event-sourcing based
> application using CQRS to process an ongoing flow of events without
> specifically coding an application (which might be a microservice) that
> make small upsert into a DB (keep the case of a constant flow of
> transactions which determine a balance update in a dedicated service)
> 2. Using CEP to trigger specific events based on a behaviour you have been
> following. Take the case of sensors I described or supermarket points
> systems: I want to give 1k points to all customers who bought tuna during
> last 3 months and spent more than 100 euro per weeks and installed
> supermarket mobile app since the beginning of the year. I want to do it
> online processing the flow, rather than triggering an offline routine which
> mines your behaviour.
>
> I got stuck understanding that Flink don't work with the DB, except for
> RockDB which it can be implemented internally. Hence, unless I redefine my
> cases (which I aim to do), Flink isn't the best choice here.
>
> Y
>
> On Thu, Jul 5, 2018 at 2:09 PM, Mich Talebzadeh  > wrote:
>
>> Hi,
>>
>> What you are saying is 

Re: Slide Window Compute Optimization

2018-07-06 Thread Fabian Hueske
Hi Yennie,

You might want to have a look at the OVER windows of Flink's Table API or
SQL [1].

An OVER window computes an aggregate (such as a count) for each incoming
record over a range of previous events.
For example the query:

SELECT ip, successful, COUNT(*) OVER (PARTITION BY ip, successful ORDER BY
loginTime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
  FROM logins;

computes for each login attempt the number of login attempts of the
previous hour.

There is no corresponding built-in operator in the DataStream API but SQL
and Table API queries can be very easily integrated with DataStream
programs [2].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html#aggregations
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api

2018-07-06 8:01 GMT+02:00 YennieChen88 :

> Hi Kostas and Rong,
> Thank you for your reply.
> As both of you ask for more info about my use case, I now reply in
> unison.
> My case is used for counting the number of successful login and
> failures
> within one hour, keyBy other login related attributes (e.g. ip, device,
> login type ...). According to the count result of the previous hour, to
> judge whether the next login is compliant.
> We have a high requirement for the flink compute time, to reduce the
> impact on user login. From receiving source to sinking results into
> database, only about 10ms time is acceptable. Base on this, we expect the
> compute result as accurate as possible. The best case without error is the
> latest sink time after 1-hour compute exactly the same as the user login
> time which need judge compliance. Is that means the smaller the step size
> of
> slide window, the more accurate the results? But Now it seems that the
> smaller step size of slide window,the longer time need to compute. Because
> once a element arrives, it will be processed in every window (number of
> windows = window size/step size)serially through one thread.
>
> Rong Rong wrote
> > Hi Yennie,
> >
> > AFAIK, the sliding window will in fact duplicate elements into multiple
> > different streams. There's a discussion thread regarding this [1].
> > We are looking into some performance improvement, can you provide some
> > more
> > info regarding your use case?
> >
> > --
> > Rong
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-7001
> >
> > On Thu, Jul 5, 2018 at 3:30 AM Kostas Kloudas 
>
> > k.kloudas@
>
> > 
> > wrote:
> >
> >> Hi,
> >>
> >> You are correct that with sliding windows you will have 3600 “open
> >> windows” at any point.
> >> Could you describe a bit more what you want to do?
> >>
> >> If you simply want to have an update of something like a counter every
> >> second, then you can
> >> implement your own logic with a ProcessFunction that allows to handle
> >> state and timers in a
> >> custom way (see [1]).
> >>
> >> Hope this helps,
> >> Kostas
> >>
> >> [1]
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/
> operators/process_function.html
> >>
> >>
> >> On Jul 5, 2018, at 12:12 PM, YennieChen88 
>
> > chenyanying3@
>
> >  wrote:
> >>
> >> Hi,
> >>I want to use slide windows of 1 hour window size and 1 second step
> >> size. I found that once a element arrives, it will be processed in 3600
> >> windows serially through one thread. It takes serveral seconds to finish
> >> one
> >> element processing,much more than my expection. Do I have any way to
> >> optimizate it?
> >>Thank you very much for your reply.
> >>
> >>
> >>
> >> --
> >> Sent from:
> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >>
> >>
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Dynamic Rule Evaluation in Flink

2018-07-06 Thread Puneet Kinra
Hi Fabian

I know you can connect 2 streams with heterogeneous schema using connect
function.
that has only one port or one parameter.
can you send more than one heterogeneous stream to connect.

On Thu, Jul 5, 2018 at 6:37 PM, Fabian Hueske  wrote:

> Hi,
>
> > Flink doesn't support connecting multiple streams with heterogeneous
> schema
>
> This is not correct.
> Flink is very well able to connect streams with different schema. However,
> you cannot union two streams with different schema.
> In order to reconfigure an operator with changing rules, you can use
> BroadcastProcessFunction or KeyedBroadcastProcessFunction [1].
>
> In order to dynamically reconfigure aggregations and windowing, you would
> need to implement the processing logic yourself in the process function
> using state and timers.
> There is no built-in support to reconfigure such operators.
>
> Best,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/dev/stream/state/broadcast_state.html
>
>
> 2018-07-05 14:41 GMT+02:00 Puneet Kinra 
> :
>
>> Hi Aarti
>>
>> Flink doesn't support connecting multiple streams with heterogeneous
>> schema ,you can try the below solution
>>
>> a) If stream A is sending some events make the output of that as
>> String/JsonString.
>>
>> b) If stream B is sending some events make the output of that as
>> String/JsonString.
>>
>> c) Now Using union function you can connect all the streams & use
>> FlatMap or process function to
>> evaluate all these streams against your defined rules.
>>
>> d) For Storing your aggregations and rules you can build your cache layer
>> and pass as a argument
>> to the constructor of that flatmap.
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Jul 2, 2018 at 2:38 PM, Aarti Gupta  wrote:
>>
>>> Hi,
>>>
>>> We are currently evaluating Flink to build a real time rule engine that
>>> looks at events in a stream and evaluates them against a set of rules.
>>>
>>> The rules are dynamically configured and can be of three types -
>>> 1. Simple Conditions - these require you to look inside a single event.
>>> Example, match rule if A happens.
>>> 2. Aggregations - these require you to aggregate multiple events.
>>> Example, match rule if more than five A's happen.
>>> 3. Complex patterns - these require you to look at multiple events and
>>> detect patterns. Example, match rule if A happens and then B happens.
>>>
>>> Since the rules are dynamically configured, we cannot use CEP.
>>>
>>> As an alternative, we are using connected streams and the CoFlatMap
>>> function to store the rules in shared state, and evaluate each incoming
>>> event against the stored rules.  Implementation is similar to what's
>>> outlined here
>>> 
>>> .
>>>
>>> My questions -
>>>
>>>1. Since the CoFlatMap function works on a single event, how do we
>>>evaluate rules that require aggregations across events. (Match rule if 
>>> more
>>>than 5 A events happen)
>>>2. Since the CoFlatMap function works on a single event, how do we
>>>evaluate rules that require pattern detection across events (Match rule 
>>> if
>>>A happens, followed by B).
>>>3. How do you dynamically define a window function.
>>>
>>>
>>> --Aarti
>>>
>>>
>>> --
>>> Aarti Gupta 
>>> Director, Engineering, Correlation
>>>
>>>
>>> aagu...@qualys.com
>>> T
>>>
>>>
>>> Qualys, Inc. – Blog  | Community
>>>  | Twitter 
>>>
>>>
>>> 
>>>
>>
>>
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> *
>>
>> *e-mail :puneet.ki...@customercentria.com
>> *
>>
>>
>>
>


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: Slide Window Compute Optimization

2018-07-06 Thread YennieChen88
Hi Kostas and Rong,
Thank you for your reply.
As both of you ask for more info about my use case, I now reply in
unison.
My case is used for counting the number of successful login and failures
within one hour, keyBy other login related attributes (e.g. ip, device,
login type ...). According to the count result of the previous hour, to
judge whether the next login is compliant.
We have a high requirement for the flink compute time, to reduce the
impact on user login. From receiving source to sinking results into
database, only about 10ms time is acceptable. Base on this, we expect the
compute result as accurate as possible. The best case without error is the
latest sink time after 1-hour compute exactly the same as the user login
time which need judge compliance. Is that means the smaller the step size of
slide window, the more accurate the results? But Now it seems that the
smaller step size of slide window,the longer time need to compute. Because
once a element arrives, it will be processed in every window (number of
windows = window size/step size)serially through one thread.

Rong Rong wrote
> Hi Yennie,
> 
> AFAIK, the sliding window will in fact duplicate elements into multiple
> different streams. There's a discussion thread regarding this [1].
> We are looking into some performance improvement, can you provide some
> more
> info regarding your use case?
> 
> --
> Rong
> 
> [1] https://issues.apache.org/jira/browse/FLINK-7001
> 
> On Thu, Jul 5, 2018 at 3:30 AM Kostas Kloudas 

> k.kloudas@

> 
> wrote:
> 
>> Hi,
>>
>> You are correct that with sliding windows you will have 3600 “open
>> windows” at any point.
>> Could you describe a bit more what you want to do?
>>
>> If you simply want to have an update of something like a counter every
>> second, then you can
>> implement your own logic with a ProcessFunction that allows to handle
>> state and timers in a
>> custom way (see [1]).
>>
>> Hope this helps,
>> Kostas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html
>>
>>
>> On Jul 5, 2018, at 12:12 PM, YennieChen88 

> chenyanying3@

>  wrote:
>>
>> Hi,
>>I want to use slide windows of 1 hour window size and 1 second step
>> size. I found that once a element arrives, it will be processed in 3600
>> windows serially through one thread. It takes serveral seconds to finish
>> one
>> element processing,much more than my expection. Do I have any way to
>> optimizate it?
>>Thank you very much for your reply.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/