Flink streaming (1.3.2) KafkaConsumer08 - Unable to retrieve any partitions

2018-01-22 Thread Tarandeep Singh
Hi,

Our flink streaming job that is reading from old version of Kafka keeps
failing (every 9 minutes or so) with this error:

java.lang.RuntimeException: Unable to retrieve any partitions for the
requested topics [extracted-dimensions]. Please check previous log entries
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getKafkaPartitions(FlinkKafkaConsumer08.java:220)

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:381)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)


Sometimes it runs for a long time but when it starts to fail, it is
predictable failure every 9 minutes. The topic does exist in the Kafka
cluster.

Can someone help me understand what could be going wrong?

Thank you,
Tarandeep


System.exit() vs throwing exception from the pipeline

2018-01-22 Thread Gordon Weakliem
What's the general advice on calling System.exit() inside an operator, vs
throwing an exception and having the execution environment tear down the
pipeline. Throwing the exception seems cleaner but it does appear that
Flink might do an orderly shutdown with System.exit(). Will the close()
methods be called, file handles closed etc?

-- 
[image: Img]
*  Gordon Weakliem*|  Sr. Software Engineer
  *O *303.493.5490
*  Boulder* | NYC | London




CONFIDENTIALITY. This communication is intended only for the use of the
intended recipient(s) and contains information that is privileged and
confidential. As a recipient of this confidential and proprietary
information, you are prohibited from distributing this information outside
of sovrn. Further, if you are not the intended recipient, please note that
any dissemination of this communication is prohibited. If you have received
this communication in error, please erase all copies of the message,
including all attachments, and please also notify the sender immediately.
Thank you for your cooperation.


Re: Far too few watermarks getting generated with Kafka source

2018-01-22 Thread Eron Wright
I think there's a misconception about `setAutowatermarkInterval`.   It
establishes the rate at which your periodic watermark generator is polled
for the current watermark.   Like most generators,
`BoundedOutOfOrdernessTimestampExtractor` produces a watermark based solely
on observed elements.   Therefore, `setAutowatermarkInterval` does not
compensate for idle sources (see FLINK-5479 and FLINK-5018).

Keep in mind that sources do not re-order emitted elements into event time
order; depending on the source's internals, it might emit elements in a
highly unordered fashion with respect to event time.   For example, the
Kafka consumer processes elements across numerous partitions
simultaneously, and the resultant ordering is highly variable.   When you
use the generic `assignTimestampsAndWatermarks` facility, the assigner is
challenged to make sense of this multiplexed stream of elements.   For this
reason, I would strongly suggest you make use of the Kafka consumer's
support for per-partition assigners, to be able to reason about the
progression of time in each partition independently.

Here's a good diagram of the phenomemon that I'm describing.  Observe how
some elements seem to 'move upward' together, and imagine that they
correspond to one partition.
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102#FIG12

Hope this helps!
Eron



On Mon, Jan 22, 2018 at 2:24 AM, Fabian Hueske  wrote:

> Hi William,
>
> The TsExtractor looks good.
> This sounds like a strange behavior and should not (or only indirectly) be
> related to the Kafka source since the WMs are generated by a separate
> extractor.
>
> - Did you compare the first (and only) generated watermark to the
> timestamps of the records that are produced by the sources?
> It might be far ahead of the timestamps in the records and won't be
> updated because the timestamps of the records are smaller.
>
> - What is the parallelism of the file sources / Kafka source and following
> operators?
> Watermarks can only advance if they advance in all parallel instance of
> the timestamp extractor.
>
> Best, Fabian
>
> 2018-01-18 16:09 GMT+01:00 William Saar :
>
>> Hi,
>> The watermark does not seem to get updated at all after the first one is
>> emitted. We used to get out-of-order warnings, but we changed to job to
>> support a bounded timestamp extractor so we no longer get those warnings.
>>
>> Our timestamp extractor looks like this
>>
>> class TsExtractor[T](time : Time) extends 
>> BoundedOutOfOrdernessTimestampExtractor[Timestamped[T]](time : Time) {
>> override def extractTimestamp(element: Timestamped[T]): Long = 
>> element.timestamp
>> }
>>
>> Our stream topology starts with a single stream, then we do two separate 
>> flat map and filtering operations on the initial stream to transform data 
>> batches
>> into streams of two different event types. We then 
>> assignTimestampsAndWatermarks(new TsExtractor[EventType](Time.seconds(20))) 
>> for each event type on both
>> branches before unioning the two branches to a single stream again (the 
>> reason for the split is that the data used to come from two different 
>> topics).
>>
>> William
>>
>>
>>
>>
>> - Original Message -
>> From:
>> "Gary Yao" 
>>
>> To:
>> "William Saar" 
>> Cc:
>> "user" 
>> Sent:
>> Thu, 18 Jan 2018 11:11:17 +0100
>> Subject:
>> Re: Far too few watermarks getting generated with Kafka source
>>
>>
>>
>> Hi William,
>>
>> How often does the Watermark get updated? Can you share your code that
>> generates
>> the watermarks? Watermarks should be strictly ascending. If your code
>> produces
>> watermarks that are not ascending, smaller ones will be discarded. Could
>> it be
>> that the events in Kafka are more "out of order" with respect to event
>> time than
>> in your file?
>>
>> You can assign timestamps in the Kafka source or later. The Flink
>> documentation
>> has a section on why it could be beneficial to assign Watermarks in the
>> Kafka
>> source:
>>
>>   https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>
>> Best,
>> Gary
>>
>> On Wed, Jan 17, 2018 at 5:15 PM, William Saar  wrote:
>>
>>> Hi,
>>> I have a job where we read data from either Kafka or a file (for
>>> testing), decode the entries and flat map them into events, and then add a
>>> timestamp and watermark assigner to the events in a later operation. This
>>> seems to generate periodic watermarks when running from a file, but when
>>> Kafka is the source we barely get any watermark updates. What could be
>>> causing this? (the environment has setAutowatermarkInterval(1000))
>>>
>>> Do we need to do all the timestamp and watermark assignment in the Kafka
>>> source? or should it work to do it in later operations? The events do seem
>>> to get propagated through the pipeline, we're just not getting watermarks...
>>>
>>> Thanks,
>>> William
>>>
>>
>>
>


Re: state.checkpoints.dir

2018-01-22 Thread Hao Sun
We generate flink.conf on the fly, so we can use different values based on
environment.

On Mon, Jan 22, 2018 at 12:53 PM Biswajit Das  wrote:

> Hello ,
>
>  Is there any hack to supply *state.checkpoints.*dir as argument or JVM
> parameter when running locally .  I can change the source
> *CheckpointCoordinator* and make it work , trying to find if there is any
> shortcuts ??
>
> Thank you
> ~ Biswajit
>


state.checkpoints.dir

2018-01-22 Thread Biswajit Das
Hello ,

 Is there any hack to supply *state.checkpoints.*dir as argument or JVM
parameter when running locally .  I can change the source
*CheckpointCoordinator* and make it work , trying to find if there is any
shortcuts ??

Thank you
~ Biswajit


Should multiple apache flink task managers have strong identity? Also, should I point their state.checkpoints.dir to the same HDFS?

2018-01-22 Thread Felipe Cavalcanti
Hi,

I'm deploying flink to kubernetes and I've some doubts...

First one is if the task managers should have strong identity (in
which case I will use statefulsets for deploying them). Second one is
if I should point rocksdb state.checkpoint.dir in all task managers to
the same HDFS path or if each of them should point to their own...

Thanks!


Re: Flink Kinesis Consumer re-reading merged shards upon restart

2018-01-22 Thread Philip Luppens
Hi Gordon,

Yeah, I’d need to confirm with our devops guys that this is the case (by
default, the Kinesis monitoring doesn’t show how many/which shards were
re-ingested, all I remember is seeing the iterator age shooting up again to
the retention horizon, but no clue if this was because of 1 shard, or
more). I do remember we were having issues regardless when there were
closed shards, but I could be wrong.

[1] https://issues.apache.org/jira/browse/FLINK-8484

I’ve created a ticket [1] to track the issue, and I’ll see if I can provide
a small patch against the 1.3 branch.

HTH,

-Phil

On Mon, Jan 22, 2018 at 6:26 PM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Philip,
>
> Thanks a lot for reporting this, and looking into this in detail.
>
> Your observation sounds accurate to me. The `endingSequenceNumber` would
> no longer be null once a shard is closed, so on restore that would mistaken
> the consumer to think that it’s a new shard and start consuming it from the
> earliest sequence number possible (i.e., treating it as if it is a new
> shard that was created while the job wasn’t running).
>
> I think we haven’t seen other reports on this, yet, because the issue you
> observed seems to only happen in a corner case where you rescaled the
> Kinesis stream while the job was down.
> Could you confirm that assumption? My guess is probably Flink users who
> uses Kinesis have currently only been rescaling Kinesis streams while the
> job was running.
>
> Your workaround is also a valid fix for this bug. Could you file a JIRA
> for this? Would be happy to also review a PR for the fix, if you would like
> to contribute it.
>
> Cheers,
> Gordon
>
>
> On 22 January 2018 at 5:08:36 PM, Philip Luppens (philip.lupp...@gmail.com)
> wrote:
>
> Hi everyone,
>
> For the past weeks, we’ve been struggling with Kinesis ingestion using the
> Flink Kinesis connector, but the seemingly complete lack of similar reports
> makes us wonder if perhaps we misconfigured or mis-used the connector.
>
> We’re using the connector to subscribe to streams varying from 1 to a 100
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis
> stream up and down during peak times. What we’ve noticed is that, while we
> were having closed shards, any Flink job restart with check- or save-point
> would result in shards being re-read from the event horizon, duplicating
> our events.
>
> We started checking the checkpoint state, and found that the shards were
> stored correctly with the proper sequence number (including for closed
> shards), but that upon restarts, the older closed shards would be read from
> the event horizon, as if their restored state would be ignored.
>
> In the end, we believe that we found the problem: in the
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard
> returned from the KinesisDataFetcher against the shards’ metadata from the
> restoration point, but we do this via a containsKey() call, which means
> we’ll use the StreamShardMetadata’s equals() method. However, this checks
> for all properties, including the endingSequenceNumber, which might have
> changed between the restored state’s checkpoint and our data fetch, thus
> failing the equality check, failing the containsKey() check, and resulting
> in the shard being re-read from the event horizon, even though it was
> present in the restored state.
>
> We’ve created a workaround where we only check for the shardId and stream
> name to restore the state of the shards we’ve already seen, and this seems
> to work correctly. However, as pointed out above, the lack of similar
> reports makes us worried that we’ve misunderstood something, so we’d
> appreciate any feedback whether or not our report makes sense before we
> file a bug in the issue tracker.
>
> Much appreciated,
>
> -Phil
>
> --
> "We cannot change the cards we are dealt, just how we play the hand." -
> Randy Pausch
>
>


-- 
"We cannot change the cards we are dealt, just how we play the hand." -
Randy Pausch


Re: Flink Kinesis Consumer re-reading merged shards upon restart

2018-01-22 Thread Tzu-Li (Gordon) Tai
Hi Philip,

Thanks a lot for reporting this, and looking into this in detail.

Your observation sounds accurate to me. The `endingSequenceNumber` would no 
longer be null once a shard is closed, so on restore that would mistaken the 
consumer to think that it’s a new shard and start consuming it from the 
earliest sequence number possible (i.e., treating it as if it is a new shard 
that was created while the job wasn’t running).

I think we haven’t seen other reports on this, yet, because the issue you 
observed seems to only happen in a corner case where you rescaled the Kinesis 
stream while the job was down.
Could you confirm that assumption? My guess is probably Flink users who uses 
Kinesis have currently only been rescaling Kinesis streams while the job was 
running.

Your workaround is also a valid fix for this bug. Could you file a JIRA for 
this? Would be happy to also review a PR for the fix, if you would like to 
contribute it.

Cheers,
Gordon

On 22 January 2018 at 5:08:36 PM, Philip Luppens (philip.lupp...@gmail.com) 
wrote:

Hi everyone,

For the past weeks, we’ve been struggling with Kinesis ingestion using the 
Flink Kinesis connector, but the seemingly complete lack of similar reports 
makes us wonder if perhaps we misconfigured or mis-used the connector.

We’re using the connector to subscribe to streams varying from 1 to a 100 
shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
stream up and down during peak times. What we’ve noticed is that, while we were 
having closed shards, any Flink job restart with check- or save-point would 
result in shards being re-read from the event horizon, duplicating our events.

We started checking the checkpoint state, and found that the shards were stored 
correctly with the proper sequence number (including for closed shards), but 
that upon restarts, the older closed shards would be read from the event 
horizon, as if their restored state would be ignored.

In the end, we believe that we found the problem: in the FlinkKinesisConsumer’s 
run() method, we’re trying to find the shard returned from the 
KinesisDataFetcher against the shards’ metadata from the restoration point, but 
we do this via a containsKey() call, which means we’ll use the 
StreamShardMetadata’s equals() method. However, this checks for all properties, 
including the endingSequenceNumber, which might have changed between the 
restored state’s checkpoint and our data fetch, thus failing the equality 
check, failing the containsKey() check, and resulting in the shard being 
re-read from the event horizon, even though it was present in the restored 
state.

We’ve created a workaround where we only check for the shardId and stream name 
to restore the state of the shards we’ve already seen, and this seems to work 
correctly. However, as pointed out above, the lack of similar reports makes us 
worried that we’ve misunderstood something, so we’d appreciate any feedback 
whether or not our report makes sense before we file a bug in the issue tracker.

Much appreciated,

-Phil

--
"We cannot change the cards we are dealt, just how we play the hand." - Randy 
Pausch


Flink Kinesis Consumer re-reading merged shards upon restart

2018-01-22 Thread Philip Luppens
Hi everyone,

For the past weeks, we’ve been struggling with Kinesis ingestion using the
Flink Kinesis connector, but the seemingly complete lack of similar reports
makes us wonder if perhaps we misconfigured or mis-used the connector.

We’re using the connector to subscribe to streams varying from 1 to a 100
shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis
stream up and down during peak times. What we’ve noticed is that, while we
were having closed shards, any Flink job restart with check- or save-point
would result in shards being re-read from the event horizon, duplicating
our events.

We started checking the checkpoint state, and found that the shards were
stored correctly with the proper sequence number (including for closed
shards), but that upon restarts, the older closed shards would be read from
the event horizon, as if their restored state would be ignored.

In the end, we believe that we found the problem: in the
FlinkKinesisConsumer’s run() method, we’re trying to find the shard
returned from the KinesisDataFetcher against the shards’ metadata from the
restoration point, but we do this via a containsKey() call, which means
we’ll use the StreamShardMetadata’s equals() method. However, this checks
for all properties, including the endingSequenceNumber, which might have
changed between the restored state’s checkpoint and our data fetch, thus
failing the equality check, failing the containsKey() check, and resulting
in the shard being re-read from the event horizon, even though it was
present in the restored state.

We’ve created a workaround where we only check for the shardId and stream
name to restore the state of the shards we’ve already seen, and this seems
to work correctly. However, as pointed out above, the lack of similar
reports makes us worried that we’ve misunderstood something, so we’d
appreciate any feedback whether or not our report makes sense before we
file a bug in the issue tracker.

Much appreciated,

-Phil

-- 
"We cannot change the cards we are dealt, just how we play the hand." -
Randy Pausch


Re: Unable to query MapState

2018-01-22 Thread Kostas Kloudas
Hi Velu,

I would recommend to switch to Flink 1.4 as the queryable state has been 
refactored to be compatible with all types of state.
You can read more here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
 

In addition, a lot of things have been simplified.

And for an example you can use this link: 
https://github.com/apache/flink/blob/a3fd548e9c76c67609bbf159d5fb743d756450b1/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java#L804

which is directly from the Queryable State IT cases.

Thanks,
Kostas

> On Jan 22, 2018, at 2:38 PM, Velu Mitwa  wrote:
> 
> Hi,
> I am trying to query Flink's MapState from Flink client (1.3.2). I was able 
> to query ValueState but when I tried to query MapState I am getting an 
> exception. 
> 
> java.io.IOException: Unconsumed bytes in the deserialized value. This 
> indicates a mismatch in the value serializers used by the KvState instance 
> and this access.
> at 
> org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438)
> at 
> com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81)
> at 
> com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
> at 
> org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
> at 
> org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
> at 
> org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
> at 
> org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
> 
> Flink Job's Logic
> 
> FlinkKafkaConsumer09 consumer = new 
> FlinkKafkaConsumer09<>(
> "/apps/application-stream:flink-demo", new MerchantApiSchema(), 
> properties);
> 
> DataStream inputEventStream = env.addSource(consumer);
> 
> DataStream> outputStream =
> inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)
> .window(SlidingProcessingTimeWindows.of(Time.seconds(120), 
> Time.milliseconds(1000)))
> .sum(2);
> 
> DataStream output = outputStream.keyBy(0).flatMap(new CountEvent());
> 
> output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);
> 
> // execute program
> env.execute("Filter Transformation Example");
> 
>   }
> 
> 
>   public static class CreateTuple
>   implements MapFunction> {
> @Override
> public Tuple3 map(MerchantApiEvent input) throws 
> Exception {
>   return new Tuple3(input.getMerchantId(), 
> input.getApiName(), 1L);
> }
> 
>   }
> 
>   public static class CountEvent extends RichFlatMapFunction String, Long>, Long> {
> 
> private transient MapState mapState;
> 
> @Override
> public void flatMap(Tuple3 input, Collector 
> out) throws Exception {
> 
>   mapState.put(input.f1, input.f2);
> 
> }
> 
> @Override
> public void open(Configuration config) {
> 
>   MapStateDescriptor mapStateDesc = new 
> MapStateDescriptor(
>   "mapQuery", TypeInformation.of(new TypeHint() {
>   }), TypeInformation.of(new TypeHint() {
>   }));
>   mapStateDesc.setQueryable("mapQuery");
> 
>   mapState = getRuntimeContext().getMapState(mapStateDesc);
> 
> }
>   }
> 
> 
> Flink Query Client's Logic
> 
> final JobID jobId = JobID.fromHexString(jobIdParam);
> 
> String key = queryStateRequestDto.getKey();
> 
> final Configuration config = new Configuration();
> config.setString(JobManagerOptions.ADDRESS, jobManagerHost);
> config.setInteger(JobManagerOptions.PORT, jobManagerPort);
> 
> HighAvailabilityServices highAvailabilityServices = null;
> try {
>   highAvailabilityServices = 
> HighAvailabilityServicesUtils.createHighAvailabilityServices(
>   config, Executors.newSingleThreadScheduledExecutor(),
>   
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
> } catch (Exception e) {
>   // TODO Auto-generated catch block

Trying to understand why a job is spilling despite of huge memory provided

2018-01-22 Thread Konstantin Gregor
Hello everyone,

I have a question about the spilling behavior of a Flink batch job.

The relevant part is a standard map-reduce, aggregating 4 billion
Tuple3 together via a groupBy(0,1).sum(2).
And there really doesn't happen much else in the job.

The problem is that I don't understand why this job spills to disk. In
this example the spilling is not really an issue, but we run the same
job with much larger datasets, where we simply run out of disk space. So
we're trying to understand better what it spills and what we can do
about it.

In this example, I am running on AWS EMR (Flink 1.3.1) with a machine
with 240GB memory. I tweaked the following parameters:

yarn.heap-cutoff-ratio: 0.1
taskmanager.memory.fraction: 0.9
taskmanager.network.numberOfBuffers: 32768

This leads to 170GB Flink Managed Memory which in my opinion should
suffice for the amount of data (the amount of data going from the
combine to the reduce is roughly 80GB). However, it is spilling over
70GB on disk.

Do you have a hint for me why this could be the case and can explain
what exactly is written into the state on such a group-reduce?

Thank you so much for your input,
best regards

Konstantin


-- 
Konstantin Gregor * konstantin.gre...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Amtsgericht München, HRB 135082


Re: ElasticsearchSink in Flink 1.4.0 with Elasticsearch 5.2+

2018-01-22 Thread Adrian Vasiliu
OK, thanks a lot Fabian.
Adrian
 
- Original message -From: Fabian Hueske To: Adrian Vasiliu Cc: user Subject: Re: ElasticsearchSink in Flink 1.4.0 with Elasticsearch 5.2+Date: Mon, Jan 22, 2018 2:54 PM 
Hi Adrian, thanks for raising this issue again.I agree, we should add support for newer ES versions.
I've added 1.5.0 as target release for FLINK-7386 and bumped the priority up. In the meantime, you can try Flavio's approach (he responded to the mail thread you linked) and fork and fix the connector.You could also try the PR for FLINK-7386 [1] and comment on the pull request whether it works for you or not. Best, Fabian

[1] https://github.com/apache/flink/pull/4675
 
 
2018-01-22 13:54 GMT+01:00 Adrian Vasiliu :

Hello,
 
With a local run of Flink 1.4.0, ElasticsearchSink fails for me with a local run of Elasticsearch 5.6.4 and 5.2.1, while the same code (with adjusted versions of dependencies) works fine with Elasticsearch 2.x (tried 2.4.6).
I get:java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor
 
(env: Mac OSX 10.13.2, oracle jdk 1.8.0_112)
 
Now, this looks similar to the issue referred in
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Elasticsearch-Sink-Error-td15246.html
which points to 
"Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client"
https://issues.apache.org/jira/browse/FLINK-7386
Side-remark: when trying with Elasticsearch 5.6.4 via a docker container, for some reason the error I get is different: "RuntimeException: Client is not connected to any Elasticsearch nodes!" (while Elasticsearch 2.4.6 works fine via docker too).
 
FLINK-7386 being pending since August 2017, would it mean that there is nowadays still no way to make Flink 1.4.0's sink work with Elasticsearch 5.2+? My use-case involves Compose for Elasticsearch 5.6.3, shared by different apps, and I can't really downgrade its Elasticsearch version.
Or would there be signs it will be fixed in Flink 1.5.0?
 
Any lights welcome.
 
Thanks,
Adrian
 
 Sauf indication contraire ci-dessus:/ Unless stated otherwise above:Compagnie IBM FranceSiège Social : 17 avenue de l'Europe, 92275 Bois-Colombes CedexRCS Nanterre 552 118 465Forme Sociale : S.A.S.Capital Social : 657.364.587 €SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A
 Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
Compagnie IBM France
Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
RCS Nanterre 552 118 465
Forme Sociale : S.A.S.
Capital Social : 657.364.587 €
SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A



Re: ElasticsearchSink in Flink 1.4.0 with Elasticsearch 5.2+

2018-01-22 Thread Fabian Hueske
Hi Adrian,

thanks for raising this issue again.
I agree, we should add support for newer ES versions.
I've added 1.5.0 as target release for FLINK-7386 and bumped the priority
up.

In the meantime, you can try Flavio's approach (he responded to the mail
thread you linked) and fork and fix the connector.
You could also try the PR for FLINK-7386 [1] and comment on the pull
request whether it works for you or not.

Best, Fabian

[1] https://github.com/apache/flink/pull/4675


2018-01-22 13:54 GMT+01:00 Adrian Vasiliu :

> Hello,
>
> With a local run of Flink 1.4.0, ElasticsearchSink fails for me with a
> local run of Elasticsearch 5.6.4 and 5.2.1, while the same code (with
> adjusted versions of dependencies) works fine with Elasticsearch 2.x (tried
> 2.4.6).
> I get:
> java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.
> BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/
> elasticsearch/action/bulk/BulkProcessor
>
> (env: Mac OSX 10.13.2, oracle jdk 1.8.0_112)
>
> Now, this looks similar to the issue referred in
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Elasticsearch-Sink-Error-td15246.html
> which points to
> "Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+
> client"
> https://issues.apache.org/jira/browse/FLINK-7386
>
> Side-remark: when trying with Elasticsearch 5.6.4 via a docker container,
> for some reason the error I get is different: "RuntimeException: Client is
> not connected to any Elasticsearch nodes!" (while Elasticsearch 2.4.6 works
> fine via docker too).
>
> FLINK-7386  being
> pending since August 2017, would it mean that there is nowadays still no
> way to make Flink 1.4.0's sink work with Elasticsearch 5.2+? My use-case
> involves Compose for Elasticsearch 5.6.3, shared by different apps, and I
> can't really downgrade its Elasticsearch version.
> Or would there be signs it will be fixed in Flink 1.5.0?
>
> Any lights welcome.
>
> Thanks,
> Adrian
>
>
> Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
> Compagnie IBM France
> Siège Social : 17 avenue de l'Europe
> ,
> 92275 Bois-Colombes Cedex
> RCS Nanterre 552 118 465
> Forme Sociale : S.A.S.
> Capital Social : 657.364.587 €
> SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A
>


Re: Error with Avro/Kyro after upgrade to Flink 1.4

2018-01-22 Thread Edward
Also, I'm not sure if this would cause the uninitialized error, but I did
notice that in the maven dependency tree there are 2 different versions of
kyro listed as Flink dependencies:
 flink-java 1.4 requires kyro 2.24, but flink-streaming-java_2.11 requires
kyro 2.21:

[INFO] +- org.apache.flink:flink-java:jar:1.4.0:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.4.0:compile
[INFO] |  |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile

[INFO] +- org.apache.flink:flink-streaming-java_2.11:jar:1.4.0:compile
[INFO] |  +- (org.apache.flink:flink-core:jar:1.4.0:compile - omitted for
duplicate)
[INFO] |  +- org.apache.flink:flink-runtime_2.11:jar:1.4.0:compile
[INFO] |  |  +- com.twitter:chill_2.11:jar:0.7.4:compile
[INFO] |  |  |  +- com.twitter:chill-java:jar:0.7.4:compile
[INFO] |  |  |  |  \- (com.esotericsoftware.kryo:kryo:jar:2.21:compile -
omitted for conflict with 2.24.0)
[INFO] |  |  |  \- (com.esotericsoftware.kryo:kryo:jar:2.21:compile -
omitted for conflict with 2.24.0)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: GetExecutionPlan fails with IllegalArgumentException in Comparator

2018-01-22 Thread Fabian Hueske
Hi Julian,

I searched for the issue in JIRA [1] but did not find a corresponding issue.
Could you open an issue for this bug?

Thank you,
Fabian

[1] https://issues.apache.org/jira/projects/FLINK/summary

2018-01-22 14:11 GMT+01:00 Bauss, Julian :

> Hello everybody,
>
>
>
> we‘re currently encountering an exception while generating an
> ExecutionGraph JSON in Flink v1.3.2.
>
> Actually executing the job does not cause an exception and everything
> works as inteded.
>
>
>
> This happens since we started adding side-outputs to many of our operators.
>
>
>
> Is this already a known bug?
>
>
>
> Below is the stacktrace. The problem seems to be a contract violation in
> the comparator implementation.
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
>
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:545)
>
> at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:419)
>
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:381)
>
> at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:838)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
>
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1086)
>
> at org.apache.flink.client.CliFrontend$2.call(
> CliFrontend.java:1133)
>
> at org.apache.flink.client.CliFrontend$2.call(
> CliFrontend.java:1130)
>
> at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1656)
>
> at org.apache.flink.runtime.security.HadoopSecurityContext.
> runSecured(HadoopSecurityContext.java:40)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
>
> Caused by: java.lang.RuntimeException: JSON plan creation failed
>
> at org.apache.flink.streaming.api.graph.StreamGraph.
> getStreamingPlanAsJSON(StreamGraph.java:668)
>
> at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.
> java:1538)
>
> at com.example.Main.main(Main.java:262)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:528)
>
> ... 13 more
>
> Caused by: java.lang.IllegalArgumentException: Comparison method violates
> its general contract!
>
> at java.util.TimSort.mergeLo(TimSort.java:777)
>
> at java.util.TimSort.mergeAt(TimSort.java:514)
>
> at java.util.TimSort.mergeCollapse(TimSort.java:441)
>
> at java.util.TimSort.sort(TimSort.java:245)
>
> at java.util.Arrays.sort(Arrays.java:1512)
>
> at java.util.ArrayList.sort(ArrayList.java:1454)
>
> at java.util.Collections.sort(Collections.java:175)
>
> at org.apache.flink.streaming.api.graph.JSONGenerator.
> getJSON(JSONGenerator.java:60)
>
> at org.apache.flink.streaming.api.graph.StreamGraph.
> getStreamingPlanAsJSON(StreamGraph.java:665)
>
> ... 21 more
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
> 
> **
>
> bonprix Handelsgesellschaft mbH
> Sitz der Gesellschaft: Hamburg
>
> Geschäftsführung:
> Markus Fuchshofen
> Dr. Richard Gottwald
> Dr. Kai Heck
> Rien Jansen
> Beiratsvorsitzender: Dr. Marcus Ackermann
>
> Handelsregister AG Hamburg HR B 36 455
>
> Adresse:
>
> bonprix Handelsgesellschaft mbH
>
> Haldesdorfer Str. 61
> 
> 22179 Hamburg
>
> Diese E-Mail enthält vertrauliche und/oder rechtlich geschützte
> Informationen.
> Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich
> erhalten haben,
> informieren Sie bitte sofort den Absender und vernichten Sie diese Mail.
> Das unerlaubte Kopieren sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet.
>
> This e-mail may contain confidential and/or privileged information.
> If you are not the intended recipient (or have received the e-mail in
> error)
> please notify the sender immediately and delete this e-mail. Any
> unauthorized copying,
> disclosure or distribution of the material in this e-mail is strictly
> forbidden.
>
> **

Error with Avro/Kyro after upgrade to Flink 1.4

2018-01-22 Thread Edward
(resubmission of a previous post, since the stack trace didn't show up last
time)

We're attempting to upgrade our 1.3.2 cluster and jobs to 1.4.0. When 
submitting jobs to the 1.4.0 Kafka cluster, they fail with a Kryo 
registration error. 

My jobs are consuming from Kafka topics with messages in Avro format. The 
avro schemas are registered with a Confluent avro schema registry. For 
ingestion, we've been using the KafkaDeserializerWrapper class from this 
pull request: https://github.com/apache/flink/pull/2705

In the pom.xml, I added a new dependency for flink-avro, and upgraded all 
other maven dependencies to version 1.4.0 

Here's the error: 

java.lang.VerifyError: Bad type on operand stack
  Exception Details:
Location:


org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V
@23: invokespecial
  Reason:
Type
'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList'
(current frame, stack[7]) is not assignable to
'com/esotericsoftware/kryo/Serializer'
  Current Frame:
bci: @23
flags: { }
locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils',
'java/util/LinkedHashMap' }
stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6,
uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12,
'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList'
}
  Bytecode:
0x000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59
0x010: bb00 0659 b700 0eb7 000f b700 10b6 0011
0x020: 57b1   

at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.getConstructor(Class.java:1825)
at
org.apache.flink.api.java.typeutils.AvroUtils.getAvroUtils(AvroUtils.java:48)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:481)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.(KryoSerializer.java:119)
at
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:90)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
at
org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102)
at
org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:253)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:520)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:165)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:692)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

Here are the dependencies: 




org.apache.flink
flink-java
1.4



org.apache.flink
flink-streaming-java_2.11
1.4



org.apache.flink
flink-avro
1.4



org.apache.flink
flink-clients_2.11
1.4



org.apache.flink
flink-connector-filesystem_2.11
1.4



org.apache.flink
flink-connector-kafka-0.11_2.11
1.4



org.apache.flink
flink-metrics-statsd
1.4


   
io.confluent
kafka-avro-serializer
3.3.1










--
Sent from: http://apache-flink-user-mailing-list-archive

Unable to query MapState

2018-01-22 Thread Velu Mitwa
Hi,
I am trying to query Flink's MapState from Flink client (1.3.2). I was able
to query ValueState but when I tried to query MapState I am getting an
exception.

java.io.IOException: Unconsumed bytes in the deserialized value. This
indicates a mismatch in the value serializers used by the KvState instance
and this access.
at
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438)
at
com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81)
at
com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at
org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
at
org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
at
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
at
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)

*Flink Job's Logic*

   * FlinkKafkaConsumer09 consumer = new
FlinkKafkaConsumer09<>(*
*"/apps/application-stream:flink-demo", new MerchantApiSchema(),
properties);*

*DataStream inputEventStream =
env.addSource(consumer);*

*DataStream> outputStream =*
*inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)*
*.window(SlidingProcessingTimeWindows.of(Time.seconds(120),
Time.milliseconds(1000)))*
*.sum(2);*

*DataStream output = outputStream.keyBy(0).flatMap(new
CountEvent());*

*output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);*

*// execute program*
*env.execute("Filter Transformation Example");*

*  }*


*  public static class CreateTuple*
*  implements MapFunction> {*
*@Override*
*public Tuple3 map(MerchantApiEvent input) throws
Exception {*
*  return new Tuple3(input.getMerchantId(),
input.getApiName(), 1L);*
*}*

*  }*

*  public static class CountEvent extends
RichFlatMapFunction, Long> {*

*private transient MapState mapState;*

*@Override*
*public void flatMap(Tuple3 input,
Collector out) throws Exception {*

*  mapState.put(input.f1, input.f2);*

*}*

*@Override*
*public void open(Configuration config) {*

*  MapStateDescriptor mapStateDesc = new
MapStateDescriptor(*
*  "mapQuery", TypeInformation.of(new TypeHint() {*
*  }), TypeInformation.of(new TypeHint() {*
*  }));*
*  mapStateDesc.setQueryable("mapQuery");*

*  mapState = getRuntimeContext().getMapState(mapStateDesc);*

*}*
*  }*


*Flink Query Client's Logic*

*final JobID jobId = JobID.fromHexString(jobIdParam);*

*String key = queryStateRequestDto.getKey();*

*final Configuration config = new Configuration();*
*config.setString(JobManagerOptions.ADDRESS, jobManagerHost);*
*config.setInteger(JobManagerOptions.PORT, jobManagerPort);*

*HighAvailabilityServices highAvailabilityServices = null;*
*try {*
*  highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(*
*  config, Executors.newSingleThreadScheduledExecutor(),*
*
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);*
*} catch (Exception e) {*
*  // TODO Auto-generated catch block*
*  e.printStackTrace();*
*}*

*try {*
*  QueryableStateClient client = new QueryableStateClient(config,
highAvailabilityServices);*

*  final TypeSerializer keySerializer = TypeInformation.of(new
TypeHint() {*
*  }).createSerializer(new ExecutionConfig());*
*  final TypeSerializer> valueSerializer =*
*  TypeInformation.of(new TypeHint>() {*
*  }).createSerializer(new ExecutionConfig());*

*  final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(key,*
*  keySerializer, VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);*

*  scala.concurrent.Future serializedResult =*
*  client.getKvState(jobId, "mapQuery", key.hashCode(),
serializedKey);*

*  // now wait for the result and return it*
*  final FiniteDuration duration = new FiniteDuration(1,
TimeUnit.SECONDS);*
*  byte[] serializedValue = Await.result(serializedResult, duration);*
*  Map value =*
*

Re: NoClassDefFoundError of a Avro class after cancel then resubmit the same job

2018-01-22 Thread Edward
Yes, we've seen this issue as well, though it usually takes many more
resubmits before the error pops up. Interestingly, of the 7 jobs we run (all
of which use different Avro schemas), we only see this issue on 1 of them.
Once the NoClassDefFoundError crops up though, it is necessary to recreate
the task managers.

There's a whole page on the Flink documentation on debugging classloading,
and Avro is mentioned several times on that page:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_classloading.html

It seems that (in 1.3 at least) each submitted job has its own classloader,
and its own instance of the Avro class definitions. However, the Avro class
cache will keep references to the Avro classes from classloaders for the
previous cancelled jobs. That said, we haven't been able to find a solution
to this error yet. Flink 1.4 would be worth a try because the of the changes
to the default classloading behaviour (child-first is the new default, not
parent-first).





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


GetExecutionPlan fails with IllegalArgumentException in Comparator

2018-01-22 Thread Bauss, Julian
Hello everybody,

we‘re currently encountering an exception while generating an ExecutionGraph 
JSON in Flink v1.3.2.
Actually executing the job does not cause an exception and everything works as 
inteded.

This happens since we started adding side-outputs to many of our operators.

Is this already a known bug?

Below is the stacktrace. The problem seems to be a contract violation in the 
comparator implementation.

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
Caused by: java.lang.RuntimeException: JSON plan creation failed
at 
org.apache.flink.streaming.api.graph.StreamGraph.getStreamingPlanAsJSON(StreamGraph.java:668)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1538)
at com.example.Main.main(Main.java:262)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
... 13 more
Caused by: java.lang.IllegalArgumentException: Comparison method violates its 
general contract!
at java.util.TimSort.mergeLo(TimSort.java:777)
at java.util.TimSort.mergeAt(TimSort.java:514)
at java.util.TimSort.mergeCollapse(TimSort.java:441)
at java.util.TimSort.sort(TimSort.java:245)
at java.util.Arrays.sort(Arrays.java:1512)
at java.util.ArrayList.sort(ArrayList.java:1454)
at java.util.Collections.sort(Collections.java:175)
at 
org.apache.flink.streaming.api.graph.JSONGenerator.getJSON(JSONGenerator.java:60)
at 
org.apache.flink.streaming.api.graph.StreamGraph.getStreamingPlanAsJSON(StreamGraph.java:665)
... 21 more

Best Regards,

Julian


**

bonprix Handelsgesellschaft mbH
Sitz der Gesellschaft: Hamburg

Geschäftsführung:
Markus Fuchshofen
Dr. Richard Gottwald
Dr. Kai Heck
Rien Jansen
Beiratsvorsitzender: Dr. Marcus Ackermann

Handelsregister AG Hamburg HR B 36 455

Adresse:

bonprix Handelsgesellschaft mbH

Haldesdorfer Str. 61
22179 Hamburg

Diese E-Mail enthält vertrauliche und/oder rechtlich geschützte Informationen.
Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten 
haben,
informieren Sie bitte sofort den Absender und vernichten Sie diese Mail.
Das unerlaubte Kopieren sowie die unbefugte Weitergabe dieser E-Mail ist nicht 
gestattet.

This e-mail may contain confidential and/or privileged information.
If you are not the intended recipient (or have received the e-mail in error)
please notify the sender immediately and delete this e-mail. Any unauthorized 
copying,
disclosure or distribution of the material in this e-mail is strictly forbidden.

**



ElasticsearchSink in Flink 1.4.0 with Elasticsearch 5.2+

2018-01-22 Thread Adrian Vasiliu
Hello,
 
With a local run of Flink 1.4.0, ElasticsearchSink fails for me with a local run of Elasticsearch 5.6.4 and 5.2.1, while the same code (with adjusted versions of dependencies) works fine with Elasticsearch 2.x (tried 2.4.6).
I get:java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor
 
(env: Mac OSX 10.13.2, oracle jdk 1.8.0_112)
 
Now, this looks similar to the issue referred in
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Elasticsearch-Sink-Error-td15246.html
which points to 
"Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client"
https://issues.apache.org/jira/browse/FLINK-7386
Side-remark: when trying with Elasticsearch 5.6.4 via a docker container, for some reason the error I get is different: "RuntimeException: Client is not connected to any Elasticsearch nodes!" (while Elasticsearch 2.4.6 works fine via docker too).
 
FLINK-7386 being pending since August 2017, would it mean that there is nowadays still no way to make Flink 1.4.0's sink work with Elasticsearch 5.2+? My use-case involves Compose for Elasticsearch 5.6.3, shared by different apps, and I can't really downgrade its Elasticsearch version.
Or would there be signs it will be fixed in Flink 1.5.0?
 
Any lights welcome.
 
Thanks,
Adrian
 
 Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
Compagnie IBM France
Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
RCS Nanterre 552 118 465
Forme Sociale : S.A.S.
Capital Social : 657.364.587 €
SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A



akka.pattern.AskTimeoutException: Ask timed out (after upgrading to flink 1.4.0)

2018-01-22 Thread Bart Kastermans
I have upgraded to flink-1.4.0, with just local task and job manager 
(flink/bin/start-cluster.sh).  After
solving the dependency issues, I now get the below error consistently on a 
specific job.  As this means
absolutely nothing to me (other than that I realise flink uses akka), I have no 
idea where to start
debugging.

The job that errors this way, reads from kafka with FlinkKafkaConsumer010 and 
writes to postgres
over a JDBC connection.

- bart


Stack trace:

java.lang.Exception: Cannot deploy task Source: Custom Source -> 
com.kpn.datalab.inhome.StoreInKV$ mapAddKrnToEvent -> Sink: Unnamed (1/1) 
(676bd6fcaff976cd72ea3672c45354ce) - TaskManager 
(2855fef5d0fb313cdb47fbd6350f81aa @ flink-1682409794-3n335 (dataPort=37534)) 
not responding after a timeout of 1 ms
at 
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$3(Execution.java:529)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://flink@flink-1682409794-3n335:42536/user/taskmanager#262833087]]
 after [1 ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage".
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
... 1 more





job code:



package com.datalab.inhome

import java.util.Properties
import java.util.function.BiConsumer

import com.datalab.SchemaStore
import com.datalab.commons.Utils._
import com.datalab.commons.flink.ByteArraySerializer
import com.datalab.commons.flink.ByteArraySerializer.MyByteArray

import com.datalab.schemas.inhome.model_result.v1_1_1.ModelResult
import org.apache.flink.api.common.functions.RichMapFunction

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.json.JSONObject
import org.slf4j.{Logger, LoggerFactory}

class mapAddUserId(schemaBaseDirectory: String) extends 
RichMapFunction[MyByteArray, (String, MyByteArray)] {
  val log: Logger = LoggerFactory.getLogger(this.getClass)

  override def open(parameters: Configuration): Unit = {
super.open(parameters)
SchemaStore.initialize(schemaBaseDirectory)
  }

  override def map(value: MyByteArray): (String, MyByteArray) = {
val event = SchemaStore.instance().deserializeEvent(value.bytes)
val userId = event.payload(classOf[ModelResult]).getProfile.getKrn.toString
(userId, value)
  }
}

object StoreInKV {
  val log: Logger = LoggerFactory.getLogger(this.getClass.getName)

  def main(args: Array[String]): Unit = {
val parameters = ParameterTool.fromArgs(args)
val setup = RaeUtils.setup()
val sourceTopic = parameters.get("sourceTopic","modelResult")

val kvTableName = parameters.get("kvTableName", "inhome_model_scores")

val jobDescription = new JSONObject()
parameters.toMap.forEach(new BiConsumer[String, String] {
  override def accept(t: String, u: String): Unit = jobDescription.put(t, u)
})
jobDescription.put("jar", this.getClass.getName)

log.info("** Settings 
")
log.info(s"Job ${this.getClass} sta

Re: BucketingSink broken in flink 1.4.0 ?

2018-01-22 Thread Stephan Ewen
Hi!

Thanks for diagnosing this - the fix you suggested is correct.

Can you still share some of the logs indicating where it fails? The reason
is that the fallback code path (using "hdfs://localhost:12345") should not
really try to connect to a local HDFS, but simply use this as a placeholder
URI to get the correct configuration for Hadoop's file system.

Best,
Stephan


On Wed, Jan 10, 2018 at 2:17 PM, jelmer  wrote:

> Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0
>
> But i am running into the issue that the bucketing sink will always try
> and connect to hdfs://localhost:12345/ instead of the hfds url i have
> specified in the constructor
>
> If i look at the code at
>
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-filesystem/src/main/java/org/
> apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125
>
>
> It tries to create the hadoop filesystem like this
>
> final org.apache.flink.core.fs.FileSystem flinkFs =
> org.apache.flink.core.fs.FileSystem.get(path.toUri());
> final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
> ((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;
>
> But FileSystem.getUnguardedFileSystem will always return a
>
>
> But FileSystem.get will always return a SafetyNetWrapperFileSystem so the
> instanceof check will never indicate that its a hadoop filesystem
>
>
> Am i missing something or is this a bug and if so what would be the
> correct fix ? I guess replacing FileSystem.get with 
> FileSystem.getUnguardedFileSystem
> would fix it but I am afraid I lack the context to know if that would be
> safe
>


Re: java.lang.ClassCastException: oracle.sql.TIMESTAMP cannot be cast to java.sql.Timestamp

2018-01-22 Thread Timo Walther
According to Stackoverflow 
(https://stackoverflow.com/questions/13269564/java-lang-classcastexception-oracle-sql-timestamp-cannot-be-cast-to-java-sql-ti) 
setting the system property accordingly should work.


Maybe you can share how you do it?

Regards,
Timo


Am 1/22/18 um 11:45 AM schrieb Puneet Kinra:
I am getting this issue while reading from the database using jdbc 
connector , can you guide me how to read
from there as  a string or may mapped to another type while reading 
from the database


On Mon, Jan 22, 2018 at 4:12 PM, Timo Walther > wrote:


Hi Puneet,

Flink SQL does only supports java.sql.Timestamp. You need to
convert it in a user-defined function or map function accordingly.

Regards,
Timo



Am 1/22/18 um 11:38 AM schrieb Puneet Kinra:

Hi
I am getting the above error when deployed to the cluster ,trying
to set the System Property but not getting reflected inside the jobs.

I need to schedule the job as well on periodic basis , i was
thinking of calling the
jar from the CLI & put into script & schedule using cron job in
linux

but stuck with the below issue.


java.lang.ClassCastException: oracle.sql.TIMESTAMP cannot be cast
to java.sql.Timestamp
        at

org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.serialize(SqlTimestampSerializer.java:27)
        at

org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
        at

org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
        at

org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at org.apache.flink.runtime.io

.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:93)
        at org.apache.flink.runtime.io

.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:114)
        at org.apache.flink.runtime.io

.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
        at

org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
        at

org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at

org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:168)
        at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Unknown Source)

-- 
*Cheers *

*
*
*Puneet Kinra*
*
*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*







--
*Cheers *
*
*
*Puneet Kinra*
*
*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com 
*


*e-mail :puneet.ki...@customercentria.com 
*







Re: java.lang.ClassCastException: oracle.sql.TIMESTAMP cannot be cast to java.sql.Timestamp

2018-01-22 Thread Timo Walther

Hi Puneet,

Flink SQL does only supports java.sql.Timestamp. You need to convert it 
in a user-defined function or map function accordingly.


Regards,
Timo



Am 1/22/18 um 11:38 AM schrieb Puneet Kinra:

Hi
I am getting the above error when deployed to the cluster ,trying to 
set the System Property but not getting reflected inside the jobs.


I need to schedule the job as well on periodic basis , i was thinking 
of calling the

jar from the CLI & put into script & schedule using cron job in linux

but stuck with the below issue.


java.lang.ClassCastException: oracle.sql.TIMESTAMP cannot be cast to 
java.sql.Timestamp
        at 
org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.serialize(SqlTimestampSerializer.java:27)
        at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
        at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
        at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:93)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:114)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
        at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
        at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:168)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Unknown Source)

--
*Cheers *
*
*
*Puneet Kinra*
*
*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com 
*


*e-mail :puneet.ki...@customercentria.com 
*







java.lang.ClassCastException: oracle.sql.TIMESTAMP cannot be cast to java.sql.Timestamp

2018-01-22 Thread Puneet Kinra
Hi

I am getting the above error when deployed to the cluster ,trying to set
the System Property but not getting reflected inside the jobs.

I need to schedule the job as well on periodic basis , i was thinking of
calling the
jar from the CLI & put into script & schedule using cron job in linux

but stuck with the below issue.


java.lang.ClassCastException: oracle.sql.TIMESTAMP cannot be cast to
java.sql.Timestamp
at
org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.serialize(SqlTimestampSerializer.java:27)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:93)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:114)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:168)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: Far too few watermarks getting generated with Kafka source

2018-01-22 Thread Fabian Hueske
Hi William,

The TsExtractor looks good.
This sounds like a strange behavior and should not (or only indirectly) be
related to the Kafka source since the WMs are generated by a separate
extractor.

- Did you compare the first (and only) generated watermark to the
timestamps of the records that are produced by the sources?
It might be far ahead of the timestamps in the records and won't be updated
because the timestamps of the records are smaller.

- What is the parallelism of the file sources / Kafka source and following
operators?
Watermarks can only advance if they advance in all parallel instance of the
timestamp extractor.

Best, Fabian

2018-01-18 16:09 GMT+01:00 William Saar :

> Hi,
> The watermark does not seem to get updated at all after the first one is
> emitted. We used to get out-of-order warnings, but we changed to job to
> support a bounded timestamp extractor so we no longer get those warnings.
>
> Our timestamp extractor looks like this
>
> class TsExtractor[T](time : Time) extends 
> BoundedOutOfOrdernessTimestampExtractor[Timestamped[T]](time : Time) {
> override def extractTimestamp(element: Timestamped[T]): Long = 
> element.timestamp
> }
>
> Our stream topology starts with a single stream, then we do two separate flat 
> map and filtering operations on the initial stream to transform data batches
> into streams of two different event types. We then 
> assignTimestampsAndWatermarks(new TsExtractor[EventType](Time.seconds(20))) 
> for each event type on both
> branches before unioning the two branches to a single stream again (the 
> reason for the split is that the data used to come from two different topics).
>
> William
>
>
>
>
> - Original Message -
> From:
> "Gary Yao" 
>
> To:
> "William Saar" 
> Cc:
> "user" 
> Sent:
> Thu, 18 Jan 2018 11:11:17 +0100
> Subject:
> Re: Far too few watermarks getting generated with Kafka source
>
>
>
> Hi William,
>
> How often does the Watermark get updated? Can you share your code that
> generates
> the watermarks? Watermarks should be strictly ascending. If your code
> produces
> watermarks that are not ascending, smaller ones will be discarded. Could
> it be
> that the events in Kafka are more "out of order" with respect to event
> time than
> in your file?
>
> You can assign timestamps in the Kafka source or later. The Flink
> documentation
> has a section on why it could be beneficial to assign Watermarks in the
> Kafka
> source:
>
>   https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_
> timestamps_watermarks.html#timestamps-per-kafka-partition
>
> Best,
> Gary
>
> On Wed, Jan 17, 2018 at 5:15 PM, William Saar  wrote:
>
>> Hi,
>> I have a job where we read data from either Kafka or a file (for
>> testing), decode the entries and flat map them into events, and then add a
>> timestamp and watermark assigner to the events in a later operation. This
>> seems to generate periodic watermarks when running from a file, but when
>> Kafka is the source we barely get any watermark updates. What could be
>> causing this? (the environment has setAutowatermarkInterval(1000))
>>
>> Do we need to do all the timestamp and watermark assignment in the Kafka
>> source? or should it work to do it in later operations? The events do seem
>> to get propagated through the pipeline, we're just not getting watermarks...
>>
>> Thanks,
>> William
>>
>
>


flink read hdfs file error

2018-01-22 Thread ??????
Dear All
   I have a question about  Flink&Hadoop.
I want to read the files on HDFS by flink,but I encountered an error as 
follows,can you please advise the solution about this problem. It will be much 
appreciated.:
 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
Exception in thread "main" 
com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ 
jar:file:/data/home/fbi/hanningning/flink-hdfs/target/flink-hdfs.jar!/reference.conf:
 804: Could not resolve substitution to a value: ${akka.stream.materializer}
at 
com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
at 
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at 
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at 
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at 
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at 
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at 
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at 
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at 
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at 
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at 
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at 
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at 
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at 
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at 
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at 
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at 
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at 
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at 
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
.




my code===
public class App {

public static void main(String[] args) throws Exception {

final String inputPath = args[0]//hdfs file path;
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

HadoopInputFormat hadoopInputFormat =
new HadoopInputFormat(new 
TextInputFormat(),LongWritable.class,
Text.class,new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new 
Path(inputPath));

DataSet> text = 
env.createInput(hadoopInputFormat);

text.print();

env.execute("read hdfs by flink test");
}

}
==maven 
dependencies===

1.4.0
1.2.17
2.4.20




org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-clients_2.11
${flink.version}


org.apache.flink
flink-hadoop-compatibility_2.11
${flink.version}



log4j
log4j
${log4j.version}


 
Best wishes



Thanks