Do I still need hadoop-aws libs when using Flink 1.5 and Presto?

2018-06-01 Thread Hao Sun
I am trying to figure out how to use S3 as state storage.
The recommended way is
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended

Seems like I only have to do two things:
*1. Put flink-s3-fs-presto to the lib*
*2. Configure *

s3.access-key: your-access-keys3.secret-key: your-secret-key


But I see this exception: ClassNotFoundException:
NativeS3FileSystem/S3AFileSystem Not Found

https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency

Add it is suggested to add more libs.
So I am confused here, is there a step 3 needed? Isn't the presto jar is
all self contained?

Thanks


Re: ConcurrentModificationException while accessing managed keyed state

2018-06-01 Thread sihua zhou

Hi Garvit,

this is unexpected, could you please provide more information about this?

- which flink version are you using?
- what state backend are you using?
- are you using the incremental checkpoint?(in case you used the rocksdb 
backend)
- did you create the customer thread to operate the state

and the exception log would also definitely help a lot if you could share with 
us.

Best, Sihua


On 06/02/2018 12:08, Garvit Sharma wrote:
Hi,

I have a use case where I am keeping the keyed state in ProcessFunction. 

Key: Integer personId;/** * The data type stored in the state */publicclass 
PersonDetails{publiclong count;publiclong lastModified;}

I have encountered a lot of ConcurrentModificationException. 

I thought Flink processes all the operators on a keyed stream in a single 
thread. It seems like operators being accessed through multiple threads.

If I get such exception then the data coming from Kafka would be consumed 
without making an update to the internal state. Making me lose the data.

Please help me in handling the case according to my use case.

Thanks,

--

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that 
makes him master.


ConcurrentModificationException while accessing managed keyed state

2018-06-01 Thread Garvit Sharma
Hi,

I have a use case where I am keeping the keyed state in ProcessFunction.

Key: Integer personId;

/**
 * The data type stored in the state
 */public class PersonDetails {
public long count;
public long lastModified;}


I have encountered a lot of ConcurrentModificationException.

I thought Flink processes all the operators on a keyed stream in a single
thread. It seems like operators being accessed through multiple threads.

If I get such exception then the data coming from Kafka would be consumed
without making an update to the internal state. Making me lose the data.

Please help me in handling the case according to my use case.

Thanks,

-- 

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination
that makes him master.


Flink 1.5, failed to instantiate S3 FS

2018-06-01 Thread Hao Sun
I can not find anywhere I have 100M. Not sure why I get this failure.
This is in my dev docker env. Same configure file worked well for 1.3.2



= Log 
Caused by: org.apache.flink.util.FlinkException: Failed to submit job
aa75905062dd0487034bb9d8b6617dc2.
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:169)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at
org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at
org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: java.lang.RuntimeException: Failed to start checkpoint ID
counter: Cannot instantiate file system for URI:
s3a://zendesk-dev-orca-fps/pod0/checkpoints-meta
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:253)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:495)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:298)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:151)
... 26 more
Caused by: java.io.IOException: Cannot instantiate file system for URI:
s3a://zendesk-dev-orca-fps/pod0/checkpoints-meta
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:61)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:441)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:379)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:247)
... 33 more
*Caused by: java.lang.NumberFormatException: For input string: "100M"*
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at
org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1429)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248)
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 40 more


Re: Ask for SQL using kafka in Flink

2018-06-01 Thread Rong Rong
Hi Radhya,

Can you provide which Flink version you are using? Based on the latest
FLINK 1.5 release, Kafka09JsonTableSource takes:

/**
 * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
 *
 * @param topic   Kafka topic to consume.
 * @param properties  Properties for the Kafka consumer.
 * @param tableSchema The schema of the table.
 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
 */

Also, your type definition: TypeInformation typeInfo2 = Types.ROW(...
arguments seem to have different length for schema names and types.

Thanks,
Rong

On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal  wrote:

> Hi,
>
> Could anyone help me to solve this problem
>
>
> /Exception in thread "main" java.lang.Error: Unresolved compilation
> problem:
> The constructor Kafka09JsonTableSource(String, Properties,
> TypeInformation) is undefined
> /
> *--This is the code *
> public class FlinkKafkaSQL {
> public static void main(String[] args) throws Exception {
> // Read parameters from command line
> final ParameterTool params = ParameterTool.fromArgs(args);
>
> if(params.getNumberOfParameters() < 5) {
> System.out.println("\nUsage: FlinkReadKafka " +
>"--read-topic  " +
>"--write-topic  " +
>"--bootstrap.servers  " +
>"zookeeper.connect" +
>"--group.id ");
> return;
> }
>
> // setup streaming environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
> 1));
> env.enableCheckpointing(30); // 300 seconds
> env.getConfig().setGlobalJobParameters(params);
>
> StreamTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
> // specify JSON field names and types
>
>
> TypeInformation typeInfo2 = Types.ROW(
> new String[] { "iotdevice", "sensorID" },
> new TypeInformation[] { Types.STRING()}
> );
>
> // create a new tablesource of JSON from kafka
> KafkaJsonTableSource kafkaTableSource = new
> Kafka09JsonTableSource(
> params.getRequired("read-topic"),
> params.getProperties(),
> typeInfo2);
>
> // run some SQL to filter results where a key is not null
> String sql = "SELECT sensorID " +
>  "FROM iotdevice ";
> tableEnv.registerTableSource("iotdevice", kafkaTableSource);
> Table result = tableEnv.sql(sql);
>
> // create a partition for the data going into kafka
> FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();
>
> // create new tablesink of JSON to kafka
> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
> params.getRequired("write-topic"),
> params.getProperties(),
> partition);
>
> result.writeToSink(kafkaTableSink);
>
> env.execute("FlinkReadWriteKafkaJSON");
> }
> }
>
>
> *This is the dependencies  in pom.xml*
>
> 
> 
> org.apache.flink
> flink-java
> 1.3.0
> 
> 
> org.apache.flink
> flink-streaming-java_2.11
> 1.3.0
> 
> 
> org.apache.flink
> flink-clients_2.11
> 1.3.0
> 
> 
> org.apache.flink
> flink-connector-kafka-0.9
>
> 1.3.0
> 
> 
> org.apache.flink
> flink-table_2.11
> 1.3.0
> 
> 
> org.apache.flink
> flink-core
> 1.3.0
> 
> 
> org.apache.flink
> flink-streaming-
> scala_2.11
> 1.3.0
> 
> 
>
>
> Regards.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Looking for a working POM file example for EMR cluster

2018-06-01 Thread Sandybayev, Turar (CAI - Atlanta)
Thanks Georgi,

I ended up chucking the POM example from AWS ref architecture and re-generating 
a new pom from Flink 1.4 archetype, and then logging into a master node via SSH 
and submitting a job directly. Using Steps on EMR Console doesn’t seem to be 
quite the same thing I guess.

Thanks for your help!!

Turar

From: Georgi Stoyanov 
Date: Friday, June 1, 2018 at 2:18 AM
To: "Sandybayev, Turar (CAI - Atlanta)" , 
"user@flink.apache.org" 
Subject: RE: Looking for a working POM file example for EMR cluster




Hi,



Did you check solutions from here - 
https://stackoverflow.com/questions/48904881/could-not-resolve-substitution-to-a-value-akka-stream-materializer-in-aws-la



Regards,

Georgi Stoyanov




From: Sandybayev, Turar (CAI - Atlanta) 
Sent: Thursday, May 31, 2018 11:23:22 PM
To: user@flink.apache.org
Subject: Looking for a working POM file example for EMR cluster

Hi,

I'm looking for a sample POM file that works when running on EMR cluster. I'm 
new to Flink and EMR, so I'm simply following AWS EMR documentation on Flink 
and I am creating a Step and submitting my program JAR file. My program is just 
a slight modification of the Wikipedia example.

I was trying to follow an example from AWS reference architecture for their 
Taxi events example: 
https://github.com/aws-samples/flink-stream-processing-refarch/blob/master/flink-taxi-stream-processor/pom.xml

However, I've been seeing various errors having to do with dependencies and 
ClassNotFoundExceptions for basic common Flink dependencies. I tried removing 
excludes from the maven-shade-plugin section of the POM file from the reference 
architecture, and now I'm seeing the following exception:

Exception in thread "main" 
com.typesafe.config.ConfigException$UnresolvedSubstitution: Could not resolve 
substitution to a value: ${akka.stream.materializer}

If I run a local Flink cluster and submit my JAR, I'm not seeing any issues 
with pretty much any way I modify the POM file. I would greatly appreciate if 
someone can point me to a working POM example.

Thanks!
Turar




Re: Multiple Task Slots support in Flink 1.5

2018-06-01 Thread Till Rohrmann
For standalone mode it works for the legacy and the new mode. Only when
using Yarn or Mesos, it can happen that you temporarily allocate too many
container. The unused container will, however, be freed if they idle too
long.

Cheers,
Till

On Fri, Jun 1, 2018, 21:52 Abdul Qadeer  wrote:

> Thank you for creating this Till, so as I understand this won't be
> supported (for standalone cluster mode) even if legacy mode is enabled?
>
> On Fri, Jun 1, 2018 at 6:21 AM, Till Rohrmann 
> wrote:
>
>> FYI: https://issues.apache.org/jira/browse/FLINK-9455 tracks the problem
>> with the multi slot support for the SlotManager.
>>
>> Cheers,
>> Till
>>
>> On Fri, Jun 1, 2018 at 10:10 AM, Abdul Qadeer 
>> wrote:
>>
>>> Hi Kien!
>>>
>>> Thank you for sharing your experience. Do you have Flink deployed in
>>> Standalone Cluster mode or K8S/Docker mode?
>>>
>>> On Fri, 1 Jun 2018 at 00:55, Fabian Hueske  wrote:
>>>
 Hi,

 The release notes state that "multiple slots are not *fully* supported".

 In Flink 1.5.0, the configured number of slots is ignored when
 requesting containers for TaskManagers from a resource manager, i.e., Flink
 assumes TMs with 1 slot.
 Hence, Flink request too many containers and starts too many TMs, but
 each TM is started with the correct number of slots.
 All unused containers are returned after a configurable time out.

 The problem can be prevented by configuring 1 slot per TM.

 Best, Fabian

 2018-05-31 14:12 GMT+02:00 Kien Truong :

> Hi,
>
> We're using multiple slots per TaskManager with legacy mode, and
> everything works fine.
>
> For the new default mode, it also seems to works for us, so I'm not
> sure what is not supported. May be someone from Flink team could clarify.
>
>
> Best regards,
>
> Kien
>
>
> On 5/31/2018 4:26 AM, Abdul Qadeer wrote:
>
> Hi!
>
> I came across the following point in release notes
> 
> of 1.5 version:
>
> "The allocation of TaskManagers with multiple slots is not fully
> supported yet."
>
> Does this mean the support for it will come as a patch for 1.5? or
> will it be in the next stable release?
> If I use legacy mode, will that support multiple slots per
> TaskManager?, or is it only the deployment change that will get affected?
>
>

>>
>


Re: Multiple Task Slots support in Flink 1.5

2018-06-01 Thread Abdul Qadeer
Thank you for creating this Till, so as I understand this won't be
supported (for standalone cluster mode) even if legacy mode is enabled?

On Fri, Jun 1, 2018 at 6:21 AM, Till Rohrmann  wrote:

> FYI: https://issues.apache.org/jira/browse/FLINK-9455 tracks the problem
> with the multi slot support for the SlotManager.
>
> Cheers,
> Till
>
> On Fri, Jun 1, 2018 at 10:10 AM, Abdul Qadeer 
> wrote:
>
>> Hi Kien!
>>
>> Thank you for sharing your experience. Do you have Flink deployed in
>> Standalone Cluster mode or K8S/Docker mode?
>>
>> On Fri, 1 Jun 2018 at 00:55, Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> The release notes state that "multiple slots are not *fully* supported".
>>>
>>> In Flink 1.5.0, the configured number of slots is ignored when
>>> requesting containers for TaskManagers from a resource manager, i.e., Flink
>>> assumes TMs with 1 slot.
>>> Hence, Flink request too many containers and starts too many TMs, but
>>> each TM is started with the correct number of slots.
>>> All unused containers are returned after a configurable time out.
>>>
>>> The problem can be prevented by configuring 1 slot per TM.
>>>
>>> Best, Fabian
>>>
>>> 2018-05-31 14:12 GMT+02:00 Kien Truong :
>>>
 Hi,

 We're using multiple slots per TaskManager with legacy mode, and
 everything works fine.

 For the new default mode, it also seems to works for us, so I'm not
 sure what is not supported. May be someone from Flink team could clarify.


 Best regards,

 Kien


 On 5/31/2018 4:26 AM, Abdul Qadeer wrote:

 Hi!

 I came across the following point in release notes
 
 of 1.5 version:

 "The allocation of TaskManagers with multiple slots is not fully
 supported yet."

 Does this mean the support for it will come as a patch for 1.5? or will
 it be in the next stable release?
 If I use legacy mode, will that support multiple slots per
 TaskManager?, or is it only the deployment change that will get affected?


>>>
>


Re: TimerService/Watermarks and Checkpoints

2018-06-01 Thread Narayanan Arunachalam
Yeah that's my observation too. Basically small chunks of late data can get
added up quickly when data is read at a faster rate.

On a related note, I would expect if there is no late data produced in
Kafka, then immaterial of what rate the data is read, this problem should
not occur.

To take care of processing the late data, I am now leveraging the watermark
time as the baseline to setup the timers in the process function as opposed
to using the time on the event itself. That way, the timer will fire with
respect to the progress of the watermark. Otherwise, when time on the event
is used as baseline, the timer will fire right away for late data. Because
the watermark will be moved ahead already.

*Here is an example code from a ProcessFunction impl:*

  private def registerTimers(
ctx: ProcessFunction[TraceEvent, Trace]#Context
  ) = {
val timer = ctx.timerService()
timer.registerEventTimeTimer(
  *timer.currentWatermark()* + timeoutMs
  // using this ^^ instead of traceEvent.getTimestamp() + timeoutMs
)
  }

This approach of using watermark as baseline for the Timer also keeps the
state size small. Otherwise, the state is not cleared until the watermark
crosses traceEvent.getTimestamp() + timeoutMs.


Regards,
Nara

On Fri, Jun 1, 2018 at 12:22 AM, Fabian Hueske  wrote:

> One explanation would be that during catch up, data is consumer with
> higher throughput because its just read from Kafka.
> Hence, you'd see also more late data per minute while the job catches up
> until it reads data at the rate at which it is produced into Kafka.
>
> Would that explain your observations?
>
> Best, Fabian
>
> 2018-05-30 23:56 GMT+02:00 Narayanan Arunachalam <
> narayanan.arunacha...@gmail.com>:
>
>> Thanks for the explanation. I looked at this metric closely and noticed
>> there are some events arriving in out of order. The hypothesis I have is,
>> when the job is restarted, all of the small out of order chunks add up and
>> show a significant number. The graph below shows the number of out of order
>> events every min. The job was started with new state at 11:53 am and then
>> restarted with the previous checkpoint at 1:24 pm.
>>
>> That said, after restart the out of order events number is very high
>> though :thinking_face:
>>
>>
>>
>>
>>
>> On Wed, May 30, 2018 at 1:55 PM, Fabian Hueske  wrote:
>>
>>> Hi Nara and Sihua,
>>>
>>> That's indeed an unexpected behavior and it would be good to identify
>>> the reason for the late data.
>>>
>>> As Sihua said, watermarks are currently not checkpointed and reset to
>>> Long.MIN_VALUE upon restart.
>>> AFAIK, the main reason why WMs are not checkpointed is that the special
>>> type of operator state that is required for this (union-list state) wasn't
>>> available when the mechanism was implemented.
>>> I think there are plans to address this shortcoming (see FLINK-5601 [1]).
>>>
>>> Best, Fabian
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>>
>>> 2018-05-30 19:00 GMT+02:00 Narayanan Arunachalam <
>>> narayanan.arunacha...@gmail.com>:
>>>
 Thanks Sihua. If it's reset to Long.MIN_VALUE I can't explain why
 outOfOrderEvents are reported. Because the event time on the data will
 always be greater than Long.MIN_VALUE.

 Following are the steps to reproduce this scenario.
 - A source to produce events with timestamps that is increasing for
 every event produced
 - Use TimeCharacteristic.EventTime
 - Use BoundedOutOfOrdernessTimestampExtractor with maxOutOfOrderness
 set to 60s.
 - Enable checkpoints
 - ProcessFunction impl to report a counter to some metrics backend when
 the timestamp of the event is less than currentWatermark
 - No out of order events will be reported initially. After few
 checkpoints are created, cancel and restart the job from a previous
 checkpoint.

 *Note*: The event stream really doesn't have out of order data. Job
 restart from a checkpoint causes this artificial out of order events
 because of the watermark value.

 Regards,
 Nara




 On Tue, May 29, 2018 at 7:54 PM, sihua zhou 
 wrote:

> Hi Nara,
>
> yes, the watermark in TimerService is not covered by the checkpoint,
> everytime the job is restarted from a previous checkpoint, it is reset to
> Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint,
> especially when we need to support rescaling(it seems not like a purely
> keyed or a operate state), maybe @Stefan or @Aljoscha could give you more
> useful information about why it wasn't covered by the checkpoint.
>
> Best, Sihua
>
>
>
> On 05/30/2018 05:44,Narayanan Arunachalam l...@gmail.com>  wrote:
>
> Hi,
>
> Is it possible the watermark in TimerService not getting reset when a
> job is restarted from a previous checkpoint? I would expect the watermark
> in a TimerService also to go 

Re: Trigerring Savepoint for the Flink Job

2018-06-01 Thread Rong Rong
Hi Anil,

Glad to know that you upgrade the system to 1.4, from our experience there
are quite a bit of changes requires to adapt to the new deployment model in
1.4 if I remember correctly.
The Deployment model "run detach" in AthenaX does not support reattach back
to the job, we use REST API to do all the subsequent life-cycle management.

There are a couple of ways I can think of to workaround if upgrade to 1.5
is not an option:
- try to use CLI API [1] instead of REST API by replacing the life-cycle
management component in WatchdogPolicy, so that you can trigger savepoints.
- try to modify the deployment model of AthenaX to not use "run detach"
mode by modifying the "YarnClusterDescriptor"

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints

Hope this can help your use case.

Thanks,
Rong

On Thu, May 31, 2018 at 8:38 PM, Anil  wrote:

> Thanks for the reply Rong. We had updated Athenax to version 1.4.
>
> I had checked Flink 1.4, it's rest endpoint dose not support only creating
> Savepoint. It has cancel With Savepoint. I think creating Savepoint is
> supported in 1.5. Since we can't upgrade to 1.5 at the moment it would like
> to find a workaround for the moment.
>
> Can you tell me how to reattaches to a running job in the cluster.
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: How to set UserGroupInformation?

2018-06-01 Thread Rong Rong
Hi Xinyu,

To add to Till's comment, setting `HADOOP_USER_NAME` in your environment is
probably the easiest way if you are using CLI.
If you are launching the job programmatically, e.g. using
YarnClusterDescriptor [1], there're many ways to set `HADOOP_USER_NAME` as
well, please share more information if you are going down that path.

Alternatively if you are set to use UserGroupInformation and "yarn" is
super user in your cluster, you can also try out proxy user approach [2].

Hope this helps.

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/yarn/YarnClusterDescriptor.html
[2]
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/Superusers.html

On Fri, Jun 1, 2018 at 7:25 AM, Till Rohrmann  wrote:

> Hi,
>
> have you tried exporting `HADOOP_USER_NAME` with the hbase user before
> running your application?
>
> Cheers,
> Till
>
> On Fri, Jun 1, 2018 at 6:31 AM, Xinyu Zhang  wrote:
>
>> Hi all
>>
>> I'm trying to write data to HDFS in directory "/home/hbase/XXX". Only
>> "hbase" user can write to the directory.
>> The problem is: I submit a job to yarn. The job will be a "yarn" user to
>> write data, while "yarn" user is not allowed to write to
>> "/home/hbase/XXX".
>> Is there any method that I can set UserGroupInformation to "hbase"?
>>
>> Thanks!
>>
>> Xinyu Zhang
>>
>
>


[ANNOUNCE] Flink Forward Berlin 2018 - Call for Presentations extended until June 11

2018-06-01 Thread Fabian Hueske
Hi everybody,

Due to popular demand, we've extended the Call for Presentations for Flink
Forward Berlin 2018 by one week.
The call will close on *Monday, June 11* (11:59pm CEST).

Please submit a proposal to present your Flink and Stream Processing use
case, experiences, and best practices in Berlin.
For the first time, Flink Forward will host a dedicated Research track to
share and discuss novel ideas and approaches.

You can submit your talk proposal at
https://flink-forward.org/call-for-presentations-submit-talk/

Best regards,
Fabian

(PC Chair for Flink Forward Berlin 2018)


Ask for SQL using kafka in Flink

2018-06-01 Thread Radhya Sahal
Hi, 

Could anyone help me to solve this problem 


/Exception in thread "main" java.lang.Error: Unresolved compilation problem: 
The constructor Kafka09JsonTableSource(String, Properties,
TypeInformation) is undefined
/
*--This is the code *
public class FlinkKafkaSQL {
public static void main(String[] args) throws Exception {
// Read parameters from command line
final ParameterTool params = ParameterTool.fromArgs(args);

if(params.getNumberOfParameters() < 5) {
System.out.println("\nUsage: FlinkReadKafka " +
   "--read-topic  " +
   "--write-topic  " +
   "--bootstrap.servers  " +
   "zookeeper.connect" +
   "--group.id ");
return;
}

// setup streaming environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
   
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
1));
env.enableCheckpointing(30); // 300 seconds
env.getConfig().setGlobalJobParameters(params);

StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);

// specify JSON field names and types
  

TypeInformation typeInfo2 = Types.ROW(
new String[] { "iotdevice", "sensorID" },
new TypeInformation[] { Types.STRING()}
);

// create a new tablesource of JSON from kafka
KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
params.getRequired("read-topic"),
params.getProperties(),
typeInfo2);

// run some SQL to filter results where a key is not null
String sql = "SELECT sensorID " +
 "FROM iotdevice ";
tableEnv.registerTableSource("iotdevice", kafkaTableSource);
Table result = tableEnv.sql(sql);

// create a partition for the data going into kafka
FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
params.getRequired("write-topic"),
params.getProperties(),
partition);

result.writeToSink(kafkaTableSink);

env.execute("FlinkReadWriteKafkaJSON");
}
}


*This is the dependencies  in pom.xml*



org.apache.flink
flink-java
1.3.0


org.apache.flink
flink-streaming-java_2.11
1.3.0


org.apache.flink
flink-clients_2.11
1.3.0


org.apache.flink
flink-connector-kafka-0.9

1.3.0


org.apache.flink
flink-table_2.11
1.3.0


org.apache.flink
flink-core
1.3.0


org.apache.flink
flink-streaming-scala_2.11
1.3.0




Regards.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Submit Flink Job on Yarn cluster to restores its state from it's last savepoint

2018-06-01 Thread Till Rohrmann
Hi Anil,

when submitting the job, simply add `bin/flink run --fromSavepoint
 -p  `.

Cheers,
Till

On Fri, Jun 1, 2018 at 4:43 PM, Anil  wrote:

> I am running these Flink job in a Yarn cluster. When the flink job fails
> or I
> restart it I want the Flink job to use the savepoint that was created
> before
> it restarted. How do can restart the Flink job with this savepoint. I am
> using Flink 1.4.2.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: some default config questions

2018-06-01 Thread Till Rohrmann
Hi,

the reason why `jobmanager.execution.failover-strategy` defaults to `full`
is that `individual` only works if you have a completely embarrassingly
parallel job where all operators chain into a single task.

Concerning `taskmanager.jvm-exit-on-oom`, depending on where the
OOMException has been thrown, it is sometimes still possible for the
TaskManager to recover from it. However, in the general case you're right
that one should not rely that the JVM is in a consistent state after an OOM
and it would probably safer to simply terminate the TM process.

Cheers,
Till

On Thu, May 31, 2018 at 11:49 AM, makeyang 
wrote:

> why jobmanager.execution.failover-strategy default value is full not
> Individual?
> why taskmanager.jvm-exit-on-oom default value is false not true?
> code is flink 1.5
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: File does not exist prevent from Job manager to start .

2018-06-01 Thread Till Rohrmann
Hi Miki,

could you check whether the files are really no longer stored on HDFS? How
did you terminate the cluster? Simply calling `bin/stop-cluster.sh`? I just
tried it locally and it could recover the job after calling
`bin/start-cluster.sh` again.

What would be helpful are the logs from the initial run of the job. So if
you can reproduce the problem, then this log would be very helpful.

Cheers,
Till

On Thu, May 31, 2018 at 6:14 PM, miki haiat  wrote:

> Hi,
>
> Im having some wierd issue with the JM recovery ,
> I using HDFS and ZOOKEEPER for HA stand alone cluster .
>
> Iv  stop the cluster change some parameters in the flink conf (Memory).
> But now when i start the cluster again im having an error that preventing
> from JM to start.
> somehow the checkpoint file doesn't exists in HDOOP  and JM wont start .
>
> full log JM log file
> 
>
>
>> 2018-05-31 11:57:05,568 ERROR 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint
>> - Fatal error occurred in the cluster entrypoint.
>
> Caused by: java.lang.Exception: Cannot set up the user code libraries:
> File does not exist: /flink1.5/ha/default/blob/job_
> 5c545fc3f43d69325fb9966b8dd4c8f3/blob_p-5d9f3be555d3b05f90b5e148235d25
> 730eb65b3d-ae486e221962f7b96e36da18fe1c57ca
> at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(
> INodeFile.java:72)
>
>
>
>


Re: Flink 1.2.1 - Job initial start time

2018-06-01 Thread Till Rohrmann
Hi Bajaj,

this is currently not possible. Could you please open a JIRA issue for this
feature to be added. I think in the end the change should be not resetting
the CREATED timestamp when calling `ExecutionGraph#restart()`.

Cheers,
Till

On Thu, May 31, 2018 at 6:34 PM, Bajaj, Abhinav 
wrote:

> Adding my team mate.
>
>
>
> ~ Abhinav Bajaj
>
>
>
> *From: *"Bajaj, Abhinav" 
> *Date: *Thursday, May 31, 2018 at 9:30 AM
> *To: *"user@flink.apache.org" 
> *Subject: *Flink 1.2.1 - Job initial start time
>
>
>
> Hi,
>
>
>
> Is there a way to know the initial start time of a job? I am looking for
> the first time job was started and not the time of the restart.
>
>
>
> It seems the “start-time” reported by monitoring API under joboverview
> 
> is reset after a restart.
>
> Also, it seems the “start-time” & “timestamps” under job details
> 
> are also reset when a job is restarted.
>
> I expected the “CREATED” date under “timestamps” to be initial created
> time of the job but it is reset after a job is restarted.
>
>
>
> We are using Flink 1.2.1 and trying to figure out a way to find the
> initial start time, if possible.
>
>
>
> Thanks,
>
> Abhinav Bajaj
>
>
>
>
>
>
>


Submit Flink Job on Yarn cluster to restores its state from it's last savepoint

2018-06-01 Thread Anil
I am running these Flink job in a Yarn cluster. When the flink job fails or I
restart it I want the Flink job to use the savepoint that was created before
it restarted. How do can restart the Flink job with this savepoint. I am
using Flink 1.4.2.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 for state backend in Flink 1.4.0

2018-06-01 Thread Stephan Ewen
A heads up on this front:

  - For state backends during checkpointing, I would suggest to use the
flink-s3-fs-presto, which is quite a bit faster than the flink-s3-fs-hadoop
by avoiding a bunch of unnecessary metadata operations.

  - We have started work on re-writing the Bucketing Sink to make it work
with the shaded S3 filesystems (like flink-s3-fs-presto). We are also
adding a more powerful internal abstraction that uses multipart uploads for
faster incremental persistence of result chunks on checkpoints. This should
be in 1.6, happy to share more as soon as it is out...


On Wed, Feb 7, 2018 at 3:56 PM, Marchant, Hayden 
wrote:

> WE actually got it working. Essentially, it's an implementation of
> HadoopFilesytem, and was written with the idea that it can be used with
> Spark (since it has broader adoption than Flink as of now). We managed to
> get it configured, and found the latency to be much lower than by using the
> s3 connector. There are a lot less copying operations etc... happening
> under the hood when using this native API which explains the better
> performance.
>
> Happy to provide assistance offline if you're interested.
>
> Thanks
> Hayden
>
> -Original Message-
> From: Edward Rojas [mailto:edward.roja...@gmail.com]
> Sent: Thursday, February 01, 2018 6:09 PM
> To: user@flink.apache.org
> Subject: RE: S3 for state backend in Flink 1.4.0
>
> Hi Hayden,
>
> It seems like a good alternative. But I see it's intended to work with
> spark, did you manage to get it working with Flink ?
>
> I some tests but I get several errors when trying to create a file, either
> for checkpointing or saving data.
>
> Thanks in advance,
> Regards,
> Edward
>
>
>
> --
> Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-
> 2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.
> nabble.com_=DwICAg=j-EkbjBYwkAB4f8ZbVn1Fw=g-
> 5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c=MW1NZ-mLVkooOHg-
> TWiOE7j2e9PCk7EOAmahXApcLtQ=b8kvNKIjylDuKlc2munyBj1da85y8a
> Z8brJsO24R2GU=
>


Re: How to set UserGroupInformation?

2018-06-01 Thread Till Rohrmann
Hi,

have you tried exporting `HADOOP_USER_NAME` with the hbase user before
running your application?

Cheers,
Till

On Fri, Jun 1, 2018 at 6:31 AM, Xinyu Zhang  wrote:

> Hi all
>
> I'm trying to write data to HDFS in directory "/home/hbase/XXX". Only
> "hbase" user can write to the directory.
> The problem is: I submit a job to yarn. The job will be a "yarn" user to
> write data, while "yarn" user is not allowed to write to
> "/home/hbase/XXX".
> Is there any method that I can set UserGroupInformation to "hbase"?
>
> Thanks!
>
> Xinyu Zhang
>


Re: Multiple Task Slots support in Flink 1.5

2018-06-01 Thread Till Rohrmann
FYI: https://issues.apache.org/jira/browse/FLINK-9455 tracks the problem
with the multi slot support for the SlotManager.

Cheers,
Till

On Fri, Jun 1, 2018 at 10:10 AM, Abdul Qadeer  wrote:

> Hi Kien!
>
> Thank you for sharing your experience. Do you have Flink deployed in
> Standalone Cluster mode or K8S/Docker mode?
>
> On Fri, 1 Jun 2018 at 00:55, Fabian Hueske  wrote:
>
>> Hi,
>>
>> The release notes state that "multiple slots are not *fully* supported".
>>
>> In Flink 1.5.0, the configured number of slots is ignored when requesting
>> containers for TaskManagers from a resource manager, i.e., Flink assumes
>> TMs with 1 slot.
>> Hence, Flink request too many containers and starts too many TMs, but
>> each TM is started with the correct number of slots.
>> All unused containers are returned after a configurable time out.
>>
>> The problem can be prevented by configuring 1 slot per TM.
>>
>> Best, Fabian
>>
>> 2018-05-31 14:12 GMT+02:00 Kien Truong :
>>
>>> Hi,
>>>
>>> We're using multiple slots per TaskManager with legacy mode, and
>>> everything works fine.
>>>
>>> For the new default mode, it also seems to works for us, so I'm not sure
>>> what is not supported. May be someone from Flink team could clarify.
>>>
>>>
>>> Best regards,
>>>
>>> Kien
>>>
>>>
>>> On 5/31/2018 4:26 AM, Abdul Qadeer wrote:
>>>
>>> Hi!
>>>
>>> I came across the following point in release notes
>>> 
>>> of 1.5 version:
>>>
>>> "The allocation of TaskManagers with multiple slots is not fully
>>> supported yet."
>>>
>>> Does this mean the support for it will come as a patch for 1.5? or will
>>> it be in the next stable release?
>>> If I use legacy mode, will that support multiple slots per TaskManager?,
>>> or is it only the deployment change that will get affected?
>>>
>>>
>>


Re: Multiple Task Slots support in Flink 1.5

2018-06-01 Thread Abdul Qadeer
Hi Kien!

Thank you for sharing your experience. Do you have Flink deployed in
Standalone Cluster mode or K8S/Docker mode?

On Fri, 1 Jun 2018 at 00:55, Fabian Hueske  wrote:

> Hi,
>
> The release notes state that "multiple slots are not *fully* supported".
>
> In Flink 1.5.0, the configured number of slots is ignored when requesting
> containers for TaskManagers from a resource manager, i.e., Flink assumes
> TMs with 1 slot.
> Hence, Flink request too many containers and starts too many TMs, but each
> TM is started with the correct number of slots.
> All unused containers are returned after a configurable time out.
>
> The problem can be prevented by configuring 1 slot per TM.
>
> Best, Fabian
>
> 2018-05-31 14:12 GMT+02:00 Kien Truong :
>
>> Hi,
>>
>> We're using multiple slots per TaskManager with legacy mode, and
>> everything works fine.
>>
>> For the new default mode, it also seems to works for us, so I'm not sure
>> what is not supported. May be someone from Flink team could clarify.
>>
>>
>> Best regards,
>>
>> Kien
>>
>>
>> On 5/31/2018 4:26 AM, Abdul Qadeer wrote:
>>
>> Hi!
>>
>> I came across the following point in release notes
>> 
>> of 1.5 version:
>>
>> "The allocation of TaskManagers with multiple slots is not fully
>> supported yet."
>>
>> Does this mean the support for it will come as a patch for 1.5? or will
>> it be in the next stable release?
>> If I use legacy mode, will that support multiple slots per TaskManager?,
>> or is it only the deployment change that will get affected?
>>
>>
>


Re: Multiple Task Slots support in Flink 1.5

2018-06-01 Thread Fabian Hueske
Hi,

The release notes state that "multiple slots are not *fully* supported".

In Flink 1.5.0, the configured number of slots is ignored when requesting
containers for TaskManagers from a resource manager, i.e., Flink assumes
TMs with 1 slot.
Hence, Flink request too many containers and starts too many TMs, but each
TM is started with the correct number of slots.
All unused containers are returned after a configurable time out.

The problem can be prevented by configuring 1 slot per TM.

Best, Fabian

2018-05-31 14:12 GMT+02:00 Kien Truong :

> Hi,
>
> We're using multiple slots per TaskManager with legacy mode, and
> everything works fine.
>
> For the new default mode, it also seems to works for us, so I'm not sure
> what is not supported. May be someone from Flink team could clarify.
>
>
> Best regards,
>
> Kien
>
>
> On 5/31/2018 4:26 AM, Abdul Qadeer wrote:
>
> Hi!
>
> I came across the following point in release notes
> 
> of 1.5 version:
>
> "The allocation of TaskManagers with multiple slots is not fully supported
> yet."
>
> Does this mean the support for it will come as a patch for 1.5? or will it
> be in the next stable release?
> If I use legacy mode, will that support multiple slots per TaskManager?,
> or is it only the deployment change that will get affected?
>
>


Re: TimerService/Watermarks and Checkpoints

2018-06-01 Thread Fabian Hueske
One explanation would be that during catch up, data is consumer with higher
throughput because its just read from Kafka.
Hence, you'd see also more late data per minute while the job catches up
until it reads data at the rate at which it is produced into Kafka.

Would that explain your observations?

Best, Fabian

2018-05-30 23:56 GMT+02:00 Narayanan Arunachalam <
narayanan.arunacha...@gmail.com>:

> Thanks for the explanation. I looked at this metric closely and noticed
> there are some events arriving in out of order. The hypothesis I have is,
> when the job is restarted, all of the small out of order chunks add up and
> show a significant number. The graph below shows the number of out of order
> events every min. The job was started with new state at 11:53 am and then
> restarted with the previous checkpoint at 1:24 pm.
>
> That said, after restart the out of order events number is very high
> though :thinking_face:
>
>
>
>
>
> On Wed, May 30, 2018 at 1:55 PM, Fabian Hueske  wrote:
>
>> Hi Nara and Sihua,
>>
>> That's indeed an unexpected behavior and it would be good to identify the
>> reason for the late data.
>>
>> As Sihua said, watermarks are currently not checkpointed and reset to
>> Long.MIN_VALUE upon restart.
>> AFAIK, the main reason why WMs are not checkpointed is that the special
>> type of operator state that is required for this (union-list state) wasn't
>> available when the mechanism was implemented.
>> I think there are plans to address this shortcoming (see FLINK-5601 [1]).
>>
>> Best, Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>
>> 2018-05-30 19:00 GMT+02:00 Narayanan Arunachalam <
>> narayanan.arunacha...@gmail.com>:
>>
>>> Thanks Sihua. If it's reset to Long.MIN_VALUE I can't explain why
>>> outOfOrderEvents are reported. Because the event time on the data will
>>> always be greater than Long.MIN_VALUE.
>>>
>>> Following are the steps to reproduce this scenario.
>>> - A source to produce events with timestamps that is increasing for
>>> every event produced
>>> - Use TimeCharacteristic.EventTime
>>> - Use BoundedOutOfOrdernessTimestampExtractor with maxOutOfOrderness
>>> set to 60s.
>>> - Enable checkpoints
>>> - ProcessFunction impl to report a counter to some metrics backend when
>>> the timestamp of the event is less than currentWatermark
>>> - No out of order events will be reported initially. After few
>>> checkpoints are created, cancel and restart the job from a previous
>>> checkpoint.
>>>
>>> *Note*: The event stream really doesn't have out of order data. Job
>>> restart from a checkpoint causes this artificial out of order events
>>> because of the watermark value.
>>>
>>> Regards,
>>> Nara
>>>
>>>
>>>
>>>
>>> On Tue, May 29, 2018 at 7:54 PM, sihua zhou  wrote:
>>>
 Hi Nara,

 yes, the watermark in TimerService is not covered by the checkpoint,
 everytime the job is restarted from a previous checkpoint, it is reset to
 Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint,
 especially when we need to support rescaling(it seems not like a purely
 keyed or a operate state), maybe @Stefan or @Aljoscha could give you more
 useful information about why it wasn't covered by the checkpoint.

 Best, Sihua



 On 05/30/2018 05:44,Narayanan Arunachalam>>> l...@gmail.com>  wrote:

 Hi,

 Is it possible the watermark in TimerService not getting reset when a
 job is restarted from a previous checkpoint? I would expect the watermark
 in a TimerService also to go back in time.

 I have the following ProcessFunction implementation.

   override def processElement(
 e: TraceEvent,
 ctx: ProcessFunction[
   TraceEvent,
   Trace
 ]#Context,
 out: Collector[Trace]
   ): Unit = {

 if (e.getAnnotationTime() < ctx.timerService().currentWatermark())
 {
   registry.counter("tracing.outOfOrderEvents").increment()
 } else {
 
 }

 I am noticing the outOfOrderEvents are reported until new events are
 read in to the stream since the last restart.

 Regards,
 Nara


>>>
>>
>


Re: is it OK to PR just for code comment errors?

2018-06-01 Thread makeyang
got it. 
thanks guys



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Is Flink:1.5 Docker image broken?

2018-06-01 Thread Cameron Wood
Hello,
My suggestion would be to try testing with Docker locally (maybe using a
specific tag even) to see if you can replicate the problem.

Doing a `docker run --rm -p 8081:8081 flink:latest local` just now I am
able to access the Flink dashboard as expected.

If you're deploying on Kubernetes, this would require you to either
`kubectl proxy ...` or expose it by configuring the appropriate
Service/Ingress/LoadBalancer in order to access the dashboard remotely.

Regards,
Cameron.
--



On 31 May 2018 at 08:58, Alexandru Gutan  wrote:

> Well those are unofficial, so you might raise a correspoinding issue on
> github (since the images are there) for that.
>
> On 31 May 2018 at 08:09, Chirag Dewan  wrote:
>
>> Hi,
>>
>> flink:latest docker image doesn't seem to work. I am not able to access
>> the Flink Dashboard after deploying it on Kubernetes.
>>
>> Anyone else facing the issue?
>>
>> Thanks,
>>
>> Chirag
>>
>
>


Re: Flink and AWS S3 integration: java.lang.NullPointerException: null uri host

2018-06-01 Thread Fabian Wollert
i solved it by myself, with the help of some debugging. i used
s3:///mybucket/ but it needs to be s3://mybucket/some_folder ... 2 slashes,
and also a folder needs to be specified ...
--


*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de
Location: ZMAP 



Am Do., 31. Mai 2018 um 09:31 Uhr schrieb Fabian Wollert :

> I'm running it in docker on EC2, cant use EMR ... yes i followed those
> instructions.
>
> Cheers
>
> --
>
>
> *Fabian WollertZalando SE*
>
> E-Mail: fabian.woll...@zalando.de
>
>
> Am Do., 31. Mai 2018 um 03:07 Uhr schrieb Bowen Li :
>
>> Did you run Flink on AWS EMR or somewhere else? Have you read and
>> followed instructions on
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#amazon-web-services-aws
>> ?
>>
>>
>>
>> On Wed, May 30, 2018 at 7:08 AM, Fabian Wollert 
>> wrote:
>>
>>> Hi, I'm trying to set up Checkpoints for Flink Jobs with S3 as a
>>> filesystem backend. I configured the following:
>>>
>>> state.backend=filesystem
>>> state.backend.fs.checkpointdir=s3:///mybucket/
>>> state.checkpoints.dir=s3:///mybucket/
>>> state.checkpoints.num-retained=3
>>>
>>> I also copied the flink-s3-fs-hadoop-1.5.0.jar into the lib folder.
>>>
>>> I get now though the following error message:
>>>
>>> Caused by: java.lang.NullPointerException: null uri host.
>>> at java.util.Objects.requireNonNull(Objects.java:228)
>>> at
>>> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:65)
>>> at
>>> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:165)
>>> at
>>> org.apache.flink.fs.s3hadoop.S3FileSystemFactory.create(S3FileSystemFactory.java:133)
>>>
>>> I tried to dig deeper into the source code, but struggled to find
>>>
>>>- what is meant with this URI
>>>- where to configure it
>>>
>>> Can anybody give some advice how to set up the S3 Backend with the new
>>> shaded lib jar?
>>>
>>> Thanks in advance
>>> --
>>>
>>>
>>> *Fabian WollertZalando SE*
>>>
>>> E-Mail: fabian.woll...@zalando.de
>>>
>>> Tamara-Danz-Straße 1
>>> 
>>> 10243 Berlin
>>> 
>>> Fax: +49 (0)30 2759 46 93
>>> E-mail: legalnot...@zalando.co.uk
>>> Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30
>>> 2000889349
>>>
>>> Management Board:
>>> Robert Gentz, David Schneider, Rubin Ritter
>>>
>>> Chairman of the Supervisory Board:
>>> Lothar Lanz
>>>
>>> Person responsible for providing the contents of Zalando SE acc. to Art.
>>> 55 RStV [Interstate Broadcasting Agreement]: Rubin Ritter
>>> Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
>>> VAT registration number: DE 260543043
>>>
>>
>>


RE: Looking for a working POM file example for EMR cluster

2018-06-01 Thread Georgi Stoyanov


Hi,



Did you check solutions from here - 
https://stackoverflow.com/questions/48904881/could-not-resolve-substitution-to-a-value-akka-stream-materializer-in-aws-la



Regards,

Georgi Stoyanov




From: Sandybayev, Turar (CAI - Atlanta) 
Sent: Thursday, May 31, 2018 11:23:22 PM
To: user@flink.apache.org
Subject: Looking for a working POM file example for EMR cluster

Hi,

I'm looking for a sample POM file that works when running on EMR cluster. I'm 
new to Flink and EMR, so I'm simply following AWS EMR documentation on Flink 
and I am creating a Step and submitting my program JAR file. My program is just 
a slight modification of the Wikipedia example.

I was trying to follow an example from AWS reference architecture for their 
Taxi events example: 
https://github.com/aws-samples/flink-stream-processing-refarch/blob/master/flink-taxi-stream-processor/pom.xml

However, I've been seeing various errors having to do with dependencies and 
ClassNotFoundExceptions for basic common Flink dependencies. I tried removing 
excludes from the maven-shade-plugin section of the POM file from the reference 
architecture, and now I'm seeing the following exception:

Exception in thread "main" 
com.typesafe.config.ConfigException$UnresolvedSubstitution: Could not resolve 
substitution to a value: ${akka.stream.materializer}

If I run a local Flink cluster and submit my JAR, I'm not seeing any issues 
with pretty much any way I modify the POM file. I would greatly appreciate if 
someone can point me to a working POM example.

Thanks!
Turar