Re: Question about Timestamp in Flink SQL

2017-11-28 Thread wangsan
Hi Xincan,

Thanks for your reply. 

The system default timezone is just as what I expected 
(sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).
 
I looked into the generated code, and I found the following code snippet:

```
result$20 = 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the 
Timestamp(-2880). I am confused why `internalToTimestamp` need to subtract 
the offset?

Best,
wangsan


> On 28 Nov 2017, at 11:32 PM, Xingcan Cui  wrote:
> 
> Hi wangsan,
> 
> in Flink, the ProcessingTime is just implemented by invoking 
> System.currentTimeMillis() and the long value will be automatically wrapped 
> to a Timestamp with the following statement:
> 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
> 
> You can check your TimeZone.getDefault() to see if it returns the right 
> TimeZone. Generally, the returned value should rely on the default TimeZone 
> of your operating system.
> 
> Hope that helps.
> 
> Best,
> Xingcan
> 
> On Tue, Nov 28, 2017 at 9:31 PM, wangsan  > wrote:
> Hi all,
> 
> While using Timestamp in Flint SQL, how can I set timezone info? Since my 
> current timezone is GMT+8, and I found the selected processing time is always 
> 8 hours late than current time. So as extracted event time.
> 
> Here’s my simplified code:
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> 
> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
> println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", 
> Locale.CHINA).format(new Date())}")
> 
> val stream: DataStream[(String, String, String)] = 
> senv.socketTextStream("localhost", ).map(line => (line, line, line))
> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
> sTableEnv.registerTable("foo", table)
> val result = sTableEnv.sql("select * from foo")
> result.printSchema()
> result.toAppendStream[Row].print()
> 
> senv.execute("foo")
> And here’s the result:
> 
> 
> 
> Best,
> wangsan
> 



Re: user driven stream processing

2017-11-28 Thread Tony Wei
Hi KZ,

https://data-artisans.com/blog/real-time-fraud-detection-ing-bank-apache-flink
This article seems to be a good example to trigger a new calculation on a
running job. Maybe you can get some help from it.

Best Regards,
Tony Wei

2017-11-29 4:53 GMT+08:00 zanqing zhang :

> Hi All,
>
> Has anyone done any stream processing driven by a user request? What's the
> recommended way of doing this? Or is this completely wrong direction to go
> for applications running on top of Flink?
>
> Basically we need to tweak the stream processing based on parameters
> provided by a user, e.g. show me the total # of application failures due to
> "ABC", which is provided by the user. We are thinking of starting a flink
> job with "ABC" as a parameter but this would result in a huge number of
> flink jobs, is there a better way for this? Can we trigger the calculation
> on a running job?
>
> Thanks in advance.
>
> KZ
>
>


Re: How to perform efficient DataSet reuse between iterations

2017-11-28 Thread Miguel Coimbra
Hello,

You're right, I was overlooking that.
With your suggestion, I now just define a different sink in each iteration
of the loop.
Then they all output to disk when executing a single bigger plan.

I have one more question: I know I can retrieve the total time this single
job takes to execute, but what if I want to know the time taken for
specific operators in the dag?
Is there some functionality in the Flink Batch API akin to counting
elements but for measuring time instead?
For example, if I am not mistaken, an operator can be executed in parallel
or serially (with a parallelism of one).

Is there a straightforward way to get the time taken by the operator's
tasks?
In a way that I could:

a) just get the time of a single task (if running serially) to get the
total operator execution time;
b) know the time taken by each parallel component of the operator's
execution so I could know where and what was the "lagging element" in the
operator's execution.

Is this possible? I was hoping I could retrieve this information in the
Java program itself and avoid processing logs.

Thanks again.

Best regards,


Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 
Skype: miguel.e.coimbra

On 28 November 2017 at 08:56, Fabian Hueske  wrote:

> Hi,
>
> by calling result.count(), you compute the complete plan from the
> beginning and not just the operations you added since the last execution.
> Looking at the output you posted, each step takes about 15 seconds (with
> about 5 secs of initialization).
> So the 20 seconds of the first step include initialization + 1st step.
> The 35 seconds on the second step include initialization, 1st step + 2nd
> step.
> If you don't call count on the intermediate steps, you can compute the 4th
> step in 65 seconds.
>
> Implementing a caching operator would be a pretty huge effort because you
> need to touch code at many places such as the API, optimizer, runtime,
> scheduling, etc.
> The documentation you found should still be applicable. There hasn't been
> major additions to the DataSet API and runtime in the last releases.
>
> Best, Fabian
>
>
>
> 2017-11-28 9:14 GMT+01:00 Miguel Coimbra :
>
>> Hello Fabian,
>>
>> Thank you for the reply.
>> I was hoping the situation had in fact changed.
>>
>> As far as I know, I am not calling execute() directly even once - it is
>> being called implicitly by simple DataSink elements added to the plan
>> through count():
>>
>> System.out.println(String.format("%d-th graph algorithm produced %d
>> elements. (%d.%d s).",
>> executionCounter,
>> *result.count()*, // this would trigger
>> execution...
>> env.getLastJobExecutionResult(
>> ).getNetRuntime(TimeUnit.SECONDS),
>> env.getLastJobExecutionResult(
>> ).getNetRuntime(TimeUnit.MILLISECONDS) % 1000));
>>
>>
>> I have taken a look at Flink's code base (e.g. how the dataflow dag is
>> processed with classes such as  
>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor,
>> org.apache.flink.api.java.operators.OperatorTranslation) but I'm not
>> sure on the most direct way to achieve this.
>> Perhaps I missed some online documentation that would help to get a grip
>> on how to contribute to the different parts of Flink?
>>
>> I did find some information which hints at implementing this sort of
>> thing (such as adding custom operators) but it was associated to an old
>> version of Flink:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> internals/add_operator.html
>> However, as far as I know there is no equivalent page in the current
>> online stable or snapshot documentation.
>>
>> What would be the best way to go about this?
>>
>> It really seems that the DataSet stored in the result variable is always
>> representing an increasing sequence of executions and not just the results
>> of the last execution.
>>
>>
>>
>> Best regards,
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com 
>> Skype: miguel.e.coimbra
>>
>> On 27 November 2017 at 22:56, Fabian Hueske  wrote:
>>
>>> Hi Miguel,
>>>
>>> I'm sorry but AFAIK, the situation has not changed.
>>>
>>> Is it possible that you are calling execute() multiple times?
>>> In that case, the 1-st and 2-nd graph would be recomputed before the
>>> 3-rd graph is computed.
>>> That would explain the increasing execution time of 15 seconds.
>>>
>>> Best, Fabian
>>>
>>> 2017-11-26 17:45 GMT+01:00 Miguel Coimbra :
>>>
 Hello,

 I'm facing a problem in an algorithm where I would like to constantly
 update a DataSet representing a graph, perform some computation,
 output one or more DataSink (such as a file on the local system) and
 then reuse the DataSet for a next iteration.
 ​I want to avoid spilling the results to disk at the end of an
 iteration and to read it back in the next iterations - the graph is very
 big and I do not wish to incur that time overhead.
 I want to reuse

Re: S3 Access in eu-central-1

2017-11-28 Thread Dominik Bruhn

Hey Stephan, Hey Steve,
that was the right hint, adding that open to the Java-Options fixed the 
problem. Maybe we should add this somehow to our Flink Wiki?


Thanks!

Dominik

On 28/11/17 11:55, Stephan Ewen wrote:
Got a pointer from Steve that this is answered on Stack Overflow here: 
https://stackoverflow.com/questions/36154484/aws-java-sdk-manually-set-signature-version 



Flink 1.4 contains a specially bundled "fs-s3-hadoop" with smaller no 
footprint, compatible across Hadoop versions, and based on a later s3a 
and AWS sdk. In that connector, it should work out of the box because it 
uses a later AWS SDK. You can also use it with earlier Hadoop versions 
because dependencies are relocated, so it should not cash/conflict.





On Mon, Nov 27, 2017 at 8:58 PM, Stephan Ewen > wrote:


Hi!

The endpoint config entry looks correct.
I was looking at this issue to see if there are pointers to anything
else, but it looks like the explicit endpoint entry is the most
important thing: https://issues.apache.org/jira/browse/HADOOP-13324


I cc-ed Steve Loughran, who is Hadoop's S3 expert (sorry Steve for
pulling you in again - listening and learning still about the subtle
bits and pieces of S3).
@Steve are S3 V4 endpoints supported in Hadoop 2.7.x already, or
only in Hadoop 2.8?

Best,
Stephan


On Mon, Nov 27, 2017 at 9:47 AM, Dominik Bruhn mailto:domi...@dbruhn.de>> wrote:

Hey,
can anyone give a hint? Does anyone have flink running with an
S3 Bucket in Frankfurt/eu-central-1 and can share his config and
setup?

Thanks,
Dominik

On 22. Nov 2017, at 17:52, domi...@dbruhn.de
 wrote:


Hey everyone,
I'm trying since hours to get Flink 1.3.2 (downloaded for
hadoop 2.7) to snapshot/checkpoint to an S3 bucket which is
hosted in the eu-central-1 region. Everything works fine for
other regions. I'm running my job on a JobTracker in local
mode. I googled the internet and found several hints, most of
them telling that setting the `fs.s3a.endpoint` should solve
it. It doesn't. I'm also sure that the core-site.xml (see
below) is picked up, if I put garbage into the endpoint then I
receive a hostname not found error.

The exception I'm getting is:
com.amazonaws.services.s3.model.AmazonS3Exception: Status
Code: 400, AWS Service: Amazon S3, AWS Request ID:
432415098B0994BC, AWS Error Code: null, AWS Error Message: Bad
Request, S3 Extended Request ID:

1PSDe4EOh7zvfNPdWrwoBKKOtsS/gf9atn5movRzcpvIH2WsR+ptXvXyFyEHXjDb3F9AniXgsBQ=

I read the AWS FAQ but I don't think that

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#ioexception-400-bad-request


applies to me as I'm not running the NativeFileSystem.

I suspect this is related to the v4 signing protocol which is
required for S3 in Frankfurt. Could it be that the aws-sdk
version is just too old? I tried to play around with it but
the hadoop adapter is incompatible with newer versions.

I have the following core-site.xml:



 
fs.s3.implorg.apache.hadoop.fs.s3a.S3AFileSystem
 fs.s3a.buffer.dir/tmp
 
fs.s3a.access.keysomething
 
fs.s3a.secret.keywont-tell
 
fs.s3a.endpoints3.eu-central-1.amazonaws.com







user driven stream processing

2017-11-28 Thread zanqing zhang
Hi All,

Has anyone done any stream processing driven by a user request? What's the
recommended way of doing this? Or is this completely wrong direction to go
for applications running on top of Flink?

Basically we need to tweak the stream processing based on parameters
provided by a user, e.g. show me the total # of application failures due to
"ABC", which is provided by the user. We are thinking of starting a flink
job with "ABC" as a parameter but this would result in a huge number of
flink jobs, is there a better way for this? Can we trigger the calculation
on a running job?

Thanks in advance.

KZ


Re: Status of Kafka011JsonTableSink for 1.4.0 release?

2017-11-28 Thread Georgios Kaklamanos
Hi Fabian,

Thanks for the answer.

I had seen the Kafka Producer but, from a quick look, I didn't seem to
find something like a JSON Serialization Schema, which I need since the
next app in my pipeline, expects to read the data in JSON.

So hoping for a TableJSONSink, I didn't look more into it. I'll check it
now.

Best,
George




On 11/27/2017 10:53 AM, Fabian Hueske wrote:
> Hi George,
> 
> Flink 1.4 will not include a KafkaTableSink for Kafka 0.11 but a
> DataStream API SinkFunction (KafkaProducer).
> As an alternative to usingthe Kafka010TableSink, you can also convert
> the result Table into a DataStream and use the KafkaProducer for Kafka
> 0.11 to emit the DataStream.
> 
> We will hopefully add Kafka 0.11 TableSink with Flink 1.5 which is
> scheduled for early next year.
> 
> Best, Fabian
> 
> 2017-11-26 2:31 GMT+01:00 Georgios Kaklamanos
> mailto:georgios.kaklama...@gwdg.de>>:
> 
> Hello,
> 
> I'm new to Flink and I'd like to use it along with Kafka 0.11, to do
> some processing on a stream of JSON data.
> 
> I've been looking the docs of the Table API (and the relevant GitHub
> code), and I see that recently there was a Kafka011JsonTableSource
> submitted, which will be available in the Flink 1.4 release.
> 
> Would there also be a Kafka011JsonTableSink in the near future (1.4
> release), or I'd have to fall back to Kafka 0.10 for now?
> 
> Thank you for your time.
> 
> Best Regards,
> George
> 
> 
> 
> --
> --
> Georgios Kaklamanos
> Research Assistant, e-Science Group, GWDG
> mailto: georgios.kaklama...@gwdg.de 
> Telefon: 0551 201-26803
> --
> GWDG - Gesellschaft für wissenschaftliche
> Datenverarbeitung mbH Göttingen
> Am Faßberg 11, 37077 Göttingen, Germany
> 
> WWW: www.gwdg.de mailto: g...@gwdg.de
> 
> Phone: +49 (0) 551 201-1510 
> Fax:   +49 (0) 551 201-2150 
> --
> Geschäftsführer: Prof. Dr. Ramin Yahyapour
> Aufsichtsratsvorsitzender: Prof. Dr. Christian Griesinger
> Sitz der Gesellschaft: Göttingen
> Registergericht: Göttingen
> Handelsregister-Nr. B 598
> --
> Zertifiziert nach ISO 9001
> --
> 
> 
> 

-- 
--
Georgios Kaklamanos
Research Assistant, e-Science Group, GWDG
mailto: georgios.kaklama...@gwdg.de
Telefon: 0551 201-26803
--
GWDG - Gesellschaft für wissenschaftliche
Datenverarbeitung mbH Göttingen
Am Faßberg 11, 37077 Göttingen, Germany

WWW: www.gwdg.demailto: g...@gwdg.de
Phone: +49 (0) 551 201-1510
Fax:   +49 (0) 551 201-2150
--
Geschäftsführer: Prof. Dr. Ramin Yahyapour
Aufsichtsratsvorsitzender: Prof. Dr. Christian Griesinger
Sitz der Gesellschaft: Göttingen
Registergericht: Göttingen
Handelsregister-Nr. B 598
--
Zertifiziert nach ISO 9001
--



smime.p7s
Description: S/MIME Cryptographic Signature


Re: How to deal with java.lang.IllegalStateException: Could not initialize keyed state backend after a code update

2017-11-28 Thread Federico D'Ambrosio
Hi Gordon,

explicitly specifying the serialversionuid did the job, thank you! The
failing task was latest_time -> (cassandra-map -> Sink:
cassandra-active-sink, map_active_stream, map_history_stream) like the
following:

val events = keyedstream
  .window(Time.seconds(20))
  .maxBy("field").name("latest-time")

CassandraSink.addSink(
   events.map(_.toCassandraTuple).name("cassandra-map").javaStream)
.setQuery(...)
.setClusterBuilder(...)
.build().name("cassandra-sink")

with cassandra-map, map_history_stream and map_active_stream, stateless map
functions
So, I guess the culprit was either the window/maxBy operator or the
cassandra sink. I guess the window/maxBy operator, since the initialization
of a keyed state is specified.
I'm attaching the complete log.

Cheers,
Federico


2017-11-28 15:32 GMT+01:00 Tzu-Li (Gordon) Tai :

> Hi Federico,
>
> It seems like the state cannot be restored because the class of the state
> type (i.e., Event) had been modified since the savepoint, and therefore has
> a conflicting serialVersionUID with whatever it is in the savepoint.
> This can happen if Java serialization is used for some part of your state,
> and the class of the written data was modified while a fixed
> serialVersionUID was not explicitly specified for that class.
>
> To avoid this, you should explicitly set a serialVersionUID for the Event
> class.
> You can actually also do that now without losing state while also
> incorporating the modifications you were trying to do for your updated job.
> Explicitly declare the serialVersionUID of the Event class to what is was
> before your modifications (i.e., 8728793377941765980, according to your
> error log).
>
> One side question: are you experiencing this restore failure for one of
> your custom operator states, or is this failing state part of some Flink
> built-in operator / connector?
> I’m asking just to have an idea of which Flink built-in operator /
> connectors still use Java serialization for user state; ideally we would
> want that to be completed removed in the future.
>
> Cheers,
> Gordon
>
>
> On 28 November 2017 at 10:02:19 PM, Federico D'Ambrosio (
> federico.dambro...@smartlab.ws) wrote:
>
> Hi,
>
> I recently had to do a code update of a long running Flink Stream job
> (1.3.2) and on the restart from the savepoint I had to deal with:
>
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>
> Caused by: java.io.InvalidClassException: lab.vardata.events.Event; local
> class incompatible: stream classdesc serial
> VersionUID = 8728793377341765980, local class serialVersionUID =
> -4253404384162522764
>
> because I have changed a method used to convert the Event to a Cassandra
> writable Tuple (in particular, I changed the return type from Tuple10 to
> Tuple11, after adding a field). I reverted those changes back since it
> wasn't much of a problem per se.
>
> Now, I understand the root cause of this issue and I wanted to ask if
> there are any "best practices" to prevent this kind of issues, without
> losing the state of the job, because of restarting it from the very
> beginning.
>
> --
> Federico D'Ambrosio
>
>


-- 
Federico D'Ambrosio
11/28/2017 14:53:28 latest_time -> (cassandra-map -> Sink: 
cassandra-active-sink, map_active_stream, map_history_stream)(1/1) switched to 
FAILED
java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: 
lab.vardata.events.AirTrafficEventWithId; local class incompatible: stream 
classdesc serialVersionUID = 8728793377341765980, local class serialVersionUID 
= -4253404384162522764
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1484)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1334)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot.read(TupleSerializerConfigSnapshot.java:64)
   

Re: How to deal with java.lang.IllegalStateException: Could not initialize keyed state backend after a code update

2017-11-28 Thread Tzu-Li (Gordon) Tai
Hi Federico,

It seems like the state cannot be restored because the class of the state type 
(i.e., Event) had been modified since the savepoint, and therefore has a 
conflicting serialVersionUID with whatever it is in the savepoint.
This can happen if Java serialization is used for some part of your state, and 
the class of the written data was modified while a fixed serialVersionUID was 
not explicitly specified for that class.

To avoid this, you should explicitly set a serialVersionUID for the Event class.
You can actually also do that now without losing state while also incorporating 
the modifications you were trying to do for your updated job.
Explicitly declare the serialVersionUID of the Event class to what is was 
before your modifications (i.e., 8728793377941765980, according to your error 
log).

One side question: are you experiencing this restore failure for one of your 
custom operator states, or is this failing state part of some Flink built-in 
operator / connector?
I’m asking just to have an idea of which Flink built-in operator / connectors 
still use Java serialization for user state; ideally we would want that to be 
completed removed in the future.

Cheers,
Gordon

On 28 November 2017 at 10:02:19 PM, Federico D'Ambrosio 
(federico.dambro...@smartlab.ws) wrote:

Hi,

I recently had to do a code update of a long running Flink Stream job (1.3.2) 
and on the restart from the savepoint I had to deal with:

java.lang.IllegalStateException: Could not initialize keyed state backend.

Caused by: java.io.InvalidClassException: lab.vardata.events.Event; local class 
incompatible: stream classdesc serial   
    VersionUID = 8728793377341765980, local class serialVersionUID = 
-4253404384162522764

because I have changed a method used to convert the Event to a Cassandra 
writable Tuple (in particular, I changed the return type from Tuple10 to 
Tuple11, after adding a field). I reverted those changes back since it wasn't 
much of a problem per se.

Now, I understand the root cause of this issue and I wanted to ask if there are 
any "best practices" to prevent this kind of issues, without losing the state 
of the job, because of restarting it from the very beginning.

--
Federico D'Ambrosio

How to deal with java.lang.IllegalStateException: Could not initialize keyed state backend after a code update

2017-11-28 Thread Federico D'Ambrosio
Hi,

I recently had to do a code update of a long running Flink Stream job
(1.3.2) and on the restart from the savepoint I had to deal with:

java.lang.IllegalStateException: Could not initialize keyed state backend.

Caused by: java.io.InvalidClassException: lab.vardata.events.Event; local
class incompatible: stream classdesc
serial   VersionUID =
8728793377341765980, local class serialVersionUID = -4253404384162522764

because I have changed a method used to convert the Event to a Cassandra
writable Tuple (in particular, I changed the return type from Tuple10 to
Tuple11, after adding a field). I reverted those changes back since it
wasn't much of a problem per se.

Now, I understand the root cause of this issue and I wanted to ask if there
are any "best practices" to prevent this kind of issues, without losing the
state of the job, because of restarting it from the very beginning.

-- 
Federico D'Ambrosio


Re: Flink 1.2.0->1.3.2 TaskManager reporting to JobManager

2017-11-28 Thread Nico Kruber
Hi Regina,
can you explain a bit more on what you are trying to do and how this is
set up? I quickly tried to reproduce locally by starting a cluster and
could not see this behaviour.

Also, can you try to increase the loglevel to INFO and see whether you
see anything suspicious in the logs?


Nico

On 28/11/17 00:19, Chan, Regina wrote:
> Hi,
> 
>  
> 
> As I moved from Flink 1.2.0 to 1.3.2 I noticed that the TaskManager may
> have all tasks with FINISHED but then take about 2-3 minutes before the
> Job execution switches to FINISHED. What is it doing that’s taking this
> long? This was a parallelism = 1 case…
> 
>  
> 
> *Regina Chan*
> 
> *Goldman Sachs–*Enterprise Platforms, Data Architecture
> 
> *30 Hudson Street, 37th floor | Jersey City, NY 07302*(  (212) 902-5697**
> 
>  
> 



signature.asc
Description: OpenPGP digital signature


Re: S3 Access in eu-central-1

2017-11-28 Thread Stephan Ewen
Got a pointer from Steve that this is answered on Stack Overflow here:
https://stackoverflow.com/questions/36154484/aws-java-
sdk-manually-set-signature-version

Flink 1.4 contains a specially bundled "fs-s3-hadoop" with smaller no
footprint, compatible across Hadoop versions, and based on a later s3a and
AWS sdk. In that connector, it should work out of the box because it uses a
later AWS SDK. You can also use it with earlier Hadoop versions because
dependencies are relocated, so it should not cash/conflict.




On Mon, Nov 27, 2017 at 8:58 PM, Stephan Ewen  wrote:

> Hi!
>
> The endpoint config entry looks correct.
> I was looking at this issue to see if there are pointers to anything else,
> but it looks like the explicit endpoint entry is the most important thing:
> https://issues.apache.org/jira/browse/HADOOP-13324
>
> I cc-ed Steve Loughran, who is Hadoop's S3 expert (sorry Steve for pulling
> you in again - listening and learning still about the subtle bits and
> pieces of S3).
> @Steve are S3 V4 endpoints supported in Hadoop 2.7.x already, or only in
> Hadoop 2.8?
>
> Best,
> Stephan
>
>
> On Mon, Nov 27, 2017 at 9:47 AM, Dominik Bruhn  wrote:
>
>> Hey,
>> can anyone give a hint? Does anyone have flink running with an S3 Bucket
>> in Frankfurt/eu-central-1 and can share his config and setup?
>>
>> Thanks,
>> Dominik
>>
>> On 22. Nov 2017, at 17:52, domi...@dbruhn.de wrote:
>>
>> Hey everyone,
>> I'm trying since hours to get Flink 1.3.2 (downloaded for hadoop 2.7) to
>> snapshot/checkpoint to an S3 bucket which is hosted in the eu-central-1
>> region. Everything works fine for other regions. I'm running my job on a
>> JobTracker in local mode. I googled the internet and found several hints,
>> most of them telling that setting the `fs.s3a.endpoint` should solve it. It
>> doesn't. I'm also sure that the core-site.xml (see below) is picked up, if
>> I put garbage into the endpoint then I receive a hostname not found error.
>>
>> The exception I'm getting is:
>> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS
>> Service: Amazon S3, AWS Request ID: 432415098B0994BC, AWS Error Code: null,
>> AWS Error Message: Bad Request, S3 Extended Request ID:
>> 1PSDe4EOh7zvfNPdWrwoBKKOtsS/gf9atn5movRzcpvIH2WsR+ptXvXyFyEH
>> XjDb3F9AniXgsBQ=
>>
>> I read the AWS FAQ but I don't think that https://ci.apache.org/projects
>> /flink/flink-docs-release-1.3/setup/aws.html#ioexception-400-bad-request
>> applies to me as I'm not running the NativeFileSystem.
>>
>> I suspect this is related to the v4 signing protocol which is required
>> for S3 in Frankfurt. Could it be that the aws-sdk version is just too old?
>> I tried to play around with it but the hadoop adapter is incompatible with
>> newer versions.
>>
>> I have the following core-site.xml:
>>
>> 
>> 
>>  fs.s3.implorg.apache.hadoop.f
>> s.s3a.S3AFileSystem
>>  fs.s3a.buffer.dir/tmp
>>  fs.s3a.access.keysomething> value>
>>  fs.s3a.secret.keywont-tell> value>
>>  fs.s3a.endpoints3.eu-central-
>> 1.amazonaws.com
>> >
>> Here is my lib folder with the versions of the aws-sdk and the hadoop-aws
>> integration:
>> -rw---1 root root   11.4M Mar 20  2014
>> aws-java-sdk-1.7.4.jar
>> -rw-r--r--1 1005 1006   70.0M Aug  3 12:10
>> flink-dist_2.11-1.3.2.jar
>> -rw-rw-r--1 1005 1006   98.3K Aug  3 12:07
>> flink-python_2.11-1.3.2.jar
>> -rw-r--r--1 1005 1006   34.9M Aug  3 11:58
>> flink-shaded-hadoop2-uber-1.3.2.jar
>> -rw---1 root root  100.7K Jan 14  2016
>> hadoop-aws-2.7.2.jar
>> -rw---1 root root  414.7K May 17  2012 httpclient-4.2.jar
>> -rw---1 root root  218.0K May  1  2012 httpcore-4.2.jar
>> -rw-rw-r--1 1005 1006  478.4K Jul 28 14:50 log4j-1.2.17.jar
>> -rw-rw-r--1 1005 10068.7K Jul 28 14:50
>> slf4j-log4j12-1.7.7.jar
>>
>> Can anyone give me any hints?
>>
>> Thanks,
>> Dominik
>>
>>
>


Re: Missing checkpoint when restarting failed job

2017-11-28 Thread Gerard Garcia
I've been monitoring the task and checkpoint 1 never gets deleted. Right
now we have:

chk-1  chk-1222  chk-326  chk-329  chk-357  chk-358  chk-8945  chk-8999
chk-9525  chk-9788  chk-9789  chk-9790  chk-9791

I made the task fail and it recovered without problems so for now I would
say that the problem was with the distributed system or that somehow the
chk-1 folder got deleted by something external to flink. If I see the
problem again I will try to get more information.

Thanks,

Gerard

On Tue, Nov 21, 2017 at 4:27 PM, Stefan Richter  wrote:

> Ok, thanks for trying to reproduce this. If possible, could you also
> activate trace-level logging for class 
> org.apache.flink.runtime.state.SharedStateRegistry?
> In case the problem occurs, this would greatly help to understand what was
> going on.
>
> > Am 21.11.2017 um 15:16 schrieb gerardg :
> >
> >> where exactly did you read many times that incremental checkpoints
> cannot
> > reference files from previous
> >> checkpoints, because we would have to correct that information. In fact,
> >> this is how incremental checkpoints work.
> >
> > My fault, I read it in some other posts in the mailing list but now that
> I
> > read it carefully it meant savepoints not checkpoints.
> >
> >> Now for this case, I would consider it extremely unlikely that a
> >> checkpoint 1620 would still reference a checkpoint 1,
> >> in particular if the files for that checkpoint are already deleted,
> which
> >> should only happen if it is no longer
> >> referenced. Which version of Flink are you using and what is your
> >> distributed filesystem? Is there any way to
> >> reproduce the problem?
> >
> > We are using Flink version 1.3.2 and GlusterFS.  There are usually a few
> > checkpoints around at the same time, for example right now:
> >
> > chk-1  chk-26  chk-27  chk-28  chk-29  chk-30  chk-31
> >
> > I'm not sure how to reproduce the problem but I'll monitor the folder to
> see
> > when chk-1 gets deleted and try to make the task fail when that happens.
> >
> > Gerard
> >
> > Gerard
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>


Re: How to perform efficient DataSet reuse between iterations

2017-11-28 Thread Fabian Hueske
Hi,

by calling result.count(), you compute the complete plan from the beginning
and not just the operations you added since the last execution.
Looking at the output you posted, each step takes about 15 seconds (with
about 5 secs of initialization).
So the 20 seconds of the first step include initialization + 1st step.
The 35 seconds on the second step include initialization, 1st step + 2nd
step.
If you don't call count on the intermediate steps, you can compute the 4th
step in 65 seconds.

Implementing a caching operator would be a pretty huge effort because you
need to touch code at many places such as the API, optimizer, runtime,
scheduling, etc.
The documentation you found should still be applicable. There hasn't been
major additions to the DataSet API and runtime in the last releases.

Best, Fabian



2017-11-28 9:14 GMT+01:00 Miguel Coimbra :

> Hello Fabian,
>
> Thank you for the reply.
> I was hoping the situation had in fact changed.
>
> As far as I know, I am not calling execute() directly even once - it is
> being called implicitly by simple DataSink elements added to the plan
> through count():
>
> System.out.println(String.format("%d-th graph algorithm produced %d
> elements. (%d.%d s).",
> executionCounter,
> *result.count()*, // this would trigger
> execution...
> env.getLastJobExecutionResult(
> ).getNetRuntime(TimeUnit.SECONDS),
> env.getLastJobExecutionResult(
> ).getNetRuntime(TimeUnit.MILLISECONDS) % 1000));
>
>
> I have taken a look at Flink's code base (e.g. how the dataflow dag is
> processed with classes such as  
> org.apache.flink.optimizer.traversals.GraphCreatingVisitor,
> org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure
> on the most direct way to achieve this.
> Perhaps I missed some online documentation that would help to get a grip
> on how to contribute to the different parts of Flink?
>
> I did find some information which hints at implementing this sort of thing
> (such as adding custom operators) but it was associated to an old version
> of Flink:
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/internals/add_
> operator.html
> However, as far as I know there is no equivalent page in the current
> online stable or snapshot documentation.
>
> What would be the best way to go about this?
>
> It really seems that the DataSet stored in the result variable is always
> representing an increasing sequence of executions and not just the results
> of the last execution.
>
>
>
> Best regards,
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com 
> Skype: miguel.e.coimbra
>
> On 27 November 2017 at 22:56, Fabian Hueske  wrote:
>
>> Hi Miguel,
>>
>> I'm sorry but AFAIK, the situation has not changed.
>>
>> Is it possible that you are calling execute() multiple times?
>> In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd
>> graph is computed.
>> That would explain the increasing execution time of 15 seconds.
>>
>> Best, Fabian
>>
>> 2017-11-26 17:45 GMT+01:00 Miguel Coimbra :
>>
>>> Hello,
>>>
>>> I'm facing a problem in an algorithm where I would like to constantly
>>> update a DataSet representing a graph, perform some computation, output
>>> one or more DataSink (such as a file on the local system) and then
>>> reuse the DataSet for a next iteration.
>>> ​I want to avoid spilling the results to disk at the end of an iteration
>>> and to read it back in the next iterations - the graph is very big and I do
>>> not wish to incur that time overhead.
>>> I want to reuse the full result DataSet of each iteration in the next
>>> one and I want to save to disk a small percentage of the produced
>>> DataSet for each iteration.
>>> The space complexity is rather constant - the number of edges in the
>>> graph increases by only 100 between iterations (which is an extremely low
>>> percentage of the original graph's edges) and is obtained using
>>> env.fromCollection(edgesToAdd).
>>>
>>> Although I am using Flink's Gelly API for graphs, I have no problem
>>> working directly with the underlying vertex and edge DataSet elements.​
>>>
>>> Two ways to do this occur to me, but it seems both are currently not
>>> supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion
>>> [1]:
>>>
>>> «​*​*
>>>
>>>
>>> *Unfortunately, it is not currently possible to output intermediate
>>> results from a bulk iteration.You can only output the final result at the
>>> end of the iteration.Also, as you correctly noticed, Flink cannot
>>> efficiently unroll a while-loop or for-loop, so that won't work either.»*
>>>
>>> *1.* I thought I could create a bulk iteration, perform the computation
>>> and between iterations, output the result to the file system.
>>> However, this is not possible, as per Vasia's answer, and produces the
>>> following exception on execution when I try (for example, to calculate a
>>> centrality metric

Re: Non-intrusive way to detect which type is using kryo ?

2017-11-28 Thread Timo Walther

Hi Kien,

at the moment I'm working on some improvements to the type system that 
will make it easier to tell if a type is a POJO or not. I have some 
utility in mind like `ensurePojo(MyType.class)` that would throw an 
exception with a reason why this type must be treated as a generic type.


Would this help in your case?

Regards,
Timo

Am 11/28/17 um 2:40 AM schrieb Kien Truong:

Hi,

Are there any way to only log when Kryo serializer is used? It's a 
pain to disable generic type then try to solve the exception one by one.


Best regards,
Kien





Re: Non-intrusive way to detect which type is using kryo ?

2017-11-28 Thread Antoine Philippot
Hi Kien,

The only way I found is to add this line at the beginning of the
application to detect kryo serialization :
`com.esotericsoftware.minlog.Log.set(Log.LEVEL_DEBUG)`

Antoine

Le mar. 28 nov. 2017 à 02:41, Kien Truong  a
écrit :

> Hi,
>
> Are there any way to only log when Kryo serializer is used? It's a pain to
> disable generic type then try to solve the exception one by one.
>
> Best regards,
> Kien
>


Re: How to perform efficient DataSet reuse between iterations

2017-11-28 Thread Miguel Coimbra
Hello Fabian,

Thank you for the reply.
I was hoping the situation had in fact changed.

As far as I know, I am not calling execute() directly even once - it is
being called implicitly by simple DataSink elements added to the plan
through count():

System.out.println(String.format("%d-th graph algorithm produced %d
elements. (%d.%d s).",
executionCounter,
*result.count()*, // this would trigger
execution...
env.getLastJobExecutionResult(
).getNetRuntime(TimeUnit.SECONDS),
env.getLastJobExecutionResult(
).getNetRuntime(TimeUnit.MILLISECONDS) % 1000));


I have taken a look at Flink's code base (e.g. how the dataflow dag is
processed with classes such as
org.apache.flink.optimizer.traversals.GraphCreatingVisitor,
org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure
on the most direct way to achieve this.
Perhaps I missed some online documentation that would help to get a grip on
how to contribute to the different parts of Flink?

I did find some information which hints at implementing this sort of thing
(such as adding custom operators) but it was associated to an old version
of Flink:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/internals/add_operator.html
However, as far as I know there is no equivalent page in the current online
stable or snapshot documentation.

What would be the best way to go about this?

It really seems that the DataSet stored in the result variable is always
representing an increasing sequence of executions and not just the results
of the last execution.



Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 
Skype: miguel.e.coimbra

On 27 November 2017 at 22:56, Fabian Hueske  wrote:

> Hi Miguel,
>
> I'm sorry but AFAIK, the situation has not changed.
>
> Is it possible that you are calling execute() multiple times?
> In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd
> graph is computed.
> That would explain the increasing execution time of 15 seconds.
>
> Best, Fabian
>
> 2017-11-26 17:45 GMT+01:00 Miguel Coimbra :
>
>> Hello,
>>
>> I'm facing a problem in an algorithm where I would like to constantly
>> update a DataSet representing a graph, perform some computation, output
>> one or more DataSink (such as a file on the local system) and then reuse
>> the DataSet for a next iteration.
>> ​I want to avoid spilling the results to disk at the end of an iteration
>> and to read it back in the next iterations - the graph is very big and I do
>> not wish to incur that time overhead.
>> I want to reuse the full result DataSet of each iteration in the next
>> one and I want to save to disk a small percentage of the produced DataSet
>> for each iteration.
>> The space complexity is rather constant - the number of edges in the
>> graph increases by only 100 between iterations (which is an extremely low
>> percentage of the original graph's edges) and is obtained using
>> env.fromCollection(edgesToAdd).
>>
>> Although I am using Flink's Gelly API for graphs, I have no problem
>> working directly with the underlying vertex and edge DataSet elements.​
>>
>> Two ways to do this occur to me, but it seems both are currently not
>> supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion
>> [1]:
>>
>> «​*​*
>>
>>
>> *Unfortunately, it is not currently possible to output intermediate
>> results from a bulk iteration.You can only output the final result at the
>> end of the iteration.Also, as you correctly noticed, Flink cannot
>> efficiently unroll a while-loop or for-loop, so that won't work either.»*
>>
>> *1.* I thought I could create a bulk iteration, perform the computation
>> and between iterations, output the result to the file system.
>> However, this is not possible, as per Vasia's answer, and produces the
>> following exception on execution when I try (for example, to calculate a
>> centrality metric for every vertex and dump the results to disk), as
>> expected based on that information:
>>
>> org.apache.flink.api.common.InvalidProgramException: A data set that is
>> part of an iteration was used as a sink or action. Did you forget to close
>> the iteration?
>>
>> *2.* Using a for loop in my own program and triggering sequential Flink
>> job executions.
>> Problem: in this scenario, while I am able to use a DataSet produced in
>> an iteration's Flink job (and dump the desired output information to disk)
>> and pass it to the next Flink job, the computation time increases
>> constantly:
>> (I also tried manually starting a session which is kept open with
>> env.startNewSession() before the loop - no impact)
>>
>> ​​
>> Initial graph has 33511 vertices and 411578 edges.
>> Added 113 vertices and 100 edges.
>> 1-th graph now has 33524 vertices and 411678 edges (2.543 s).
>> 1-th graph algorithm produced 33524 elements. *(20.96 s)*.
>> Added 222 vertices and 200 edges.
>> 2-th graph now ha