Re: 配置hadoop依赖问题

2024-04-01 Thread Biao Geng
Hi fengqi,

“Hadoop is not in the
classpath/dependencies.”报错说明org.apache.hadoop.conf.Configuration和org.apache.hadoop.fs.FileSystem这些hdfs所需的类没有找到。

如果你的系统环境中有hadoop的话,通常是用这种方式来设置classpath:
export HADOOP_CLASSPATH=`hadoop classpath`

如果你的提交方式是提交到本地一个standalone的flink集群的话,可以检查下flink生成的日志文件,里面会打印classpath,可以看下是否有Hadoop相关的class。

Best,
Biao Geng


ha.fen...@aisino.com  于2024年4月2日周二 10:24写道:

> 1、在开发环境下,添加的有hadoop-client依赖,checkpoint时可以访问到hdfs的路径
> 2、flink1.19.0,hadoop3.3.1,jar提交到单机flink系统中,提示如下错误
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobInitializationException: Could not start
> the JobMaster.
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: org.apache.flink.runtime.client.JobInitializationException:
> Could not start the JobMaster.
> at
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint
> storage at checkpoint coordinator side.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
> ... 3 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create
> checkpoint storage at checkpoint coordinator side.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:364)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:273)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:503)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:334)
> at
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:173)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:381)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:224)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:140)
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:162)
> at
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379)
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:356)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ... 3 more
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme '

Re: [DISCUSS] Hadoop 2 vs Hadoop 3 usage

2024-01-15 Thread Yang Wang
I could share some metrics about Alibaba Cloud EMR clusters.
The ratio of Hadoop2 VS Hadoop3 is 1:3.


Best,
Yang

On Thu, Dec 28, 2023 at 8:16 PM Martijn Visser 
wrote:

> Hi all,
>
> I want to get some insights on how many users are still using Hadoop 2
> vs how many users are using Hadoop 3. Flink currently requires a
> minimum version of Hadoop 2.10.2 for certain features, but also
> extensively uses Hadoop 3 (like for the file system implementations)
>
> Hadoop 2 has a large number of direct and indirect vulnerabilities
> [1]. Most of them can only be resolved by dropping support for Hadoop
> 2 and upgrading to a Hadoop 3 version. This thread is primarily to get
> more insights if Hadoop 2 is still commonly used, or if we can
> actually discuss dropping support for Hadoop 2 in Flink.
>
> Best regards,
>
> Martijn
>
> [1]
> https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common/2.10.2
>


[DISCUSS] Hadoop 2 vs Hadoop 3 usage

2023-12-28 Thread Martijn Visser
Hi all,

I want to get some insights on how many users are still using Hadoop 2
vs how many users are using Hadoop 3. Flink currently requires a
minimum version of Hadoop 2.10.2 for certain features, but also
extensively uses Hadoop 3 (like for the file system implementations)

Hadoop 2 has a large number of direct and indirect vulnerabilities
[1]. Most of them can only be resolved by dropping support for Hadoop
2 and upgrading to a Hadoop 3 version. This thread is primarily to get
more insights if Hadoop 2 is still commonly used, or if we can
actually discuss dropping support for Hadoop 2 in Flink.

Best regards,

Martijn

[1] https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common/2.10.2


Re: Hadoop Error on ECS Fargate

2023-07-17 Thread Martijn Visser
Hi Mengxi Wang,

Which Flink version are you using?

Best regards,

Martijn

On Thu, Jul 13, 2023 at 3:21 PM Wang, Mengxi X via user <
user@flink.apache.org> wrote:

> Hi community,
>
>
>
> We got this kuerberos error with Hadoop as file system on ECS Fargate
> deployment.
>
>
>
> Caused by: org.apache.hadoop.security.KerberosAuthException: failure to
> login: javax.security.auth.login.LoginException:
> java.lang.NullPointerException: invalid null input: name
>
>
>
> Caused by: javax.security.auth.login.LoginException:
> java.lang.NullPointerException: invalid null input: name
>
>
>
> We don’t actually need Kerberos authentication so I’ve added properties to
> disable Hadoop Kerberos authentication to flink-config.yaml and I can see
> from logs they’ve been picked up. But still the errors persist. Can anybody
> help please?
>
>
>
> Best wishes,
>
> Mengxi Wang
>
>
>
> This message is confidential and subject to terms at:
> https://www.jpmorgan.com/emaildisclaimer including on confidential,
> privileged or legal entity information, malicious content and monitoring of
> electronic messages. If you are not the intended recipient, please delete
> this message and notify the sender immediately. Any unauthorized use is
> strictly prohibited.
>


Hadoop Error on ECS Fargate

2023-07-13 Thread Wang, Mengxi X via user
Hi community,

We got this kuerberos error with Hadoop as file system on ECS Fargate 
deployment.

Caused by: org.apache.hadoop.security.KerberosAuthException: failure to login: 
javax.security.auth.login.LoginException: java.lang.NullPointerException: 
invalid null input: name

Caused by: javax.security.auth.login.LoginException: 
java.lang.NullPointerException: invalid null input: name

We don't actually need Kerberos authentication so I've added properties to 
disable Hadoop Kerberos authentication to flink-config.yaml and I can see from 
logs they've been picked up. But still the errors persist. Can anybody help 
please?

Best wishes,
Mengxi Wang


This message is confidential and subject to terms at: 
https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged 
or legal entity information, malicious content and monitoring of electronic 
messages. If you are not the intended recipient, please delete this message and 
notify the sender immediately. Any unauthorized use is strictly prohibited.


Re: fail to mount hadoop-config-volume when using flink-k8s-operator

2022-10-13 Thread Yang Wang
Currently, exporting the env "HADOOP_CONF_DIR" could only work for native
K8s integration. The flink client will try to create the
hadoop-config-volume automatically if hadoop env found.

If you want to set the HADOOP_CONF_DIR in the docker image, please also
make sure the specified hadoop conf directory exists in the image.

For flink-k8s-operator, another feasible solution is to create a
hadoop-config-configmap manually and then use
*"kubernetes.hadoop.conf.config-map.name
<http://kubernetes.hadoop.conf.config-map.name>" *to mount it to JobManager
and TaskManager pods.


Best,
Yang

Liting Liu (litiliu)  于2022年10月12日周三 16:11写道:

> Hi, community:
>   I'm using flink-k8s-operator v1.2.0 to deploy flink job. And the
> "HADOOP_CONF_DIR" environment variable was setted in the image that i
> buiilded from flink:1.15.  I found the taskmanager pod was trying to mount
> a volume named "hadoop-config-volume" from configMap.  But the configMap
> with the name "hadoop-config-volume" was't created.
>
> Do i need to remove the "HADOOP_CONF_DIR" environment variable in
> dockerfile?
> If yes, what should i do to specify the hadoop conf?
>
>


fail to mount hadoop-config-volume when using flink-k8s-operator

2022-10-12 Thread Liting Liu (litiliu)
Hi, community:
  I'm using flink-k8s-operator v1.2.0 to deploy flink job. And the 
"HADOOP_CONF_DIR" environment variable was setted in the image that i buiilded 
from flink:1.15.  I found the taskmanager pod was trying to mount a volume 
named "hadoop-config-volume" from configMap.  But the configMap with the name 
"hadoop-config-volume" was't created.

Do i need to remove the "HADOOP_CONF_DIR" environment variable in dockerfile?
If yes, what should i do to specify the hadoop conf?



Setting boundedness for legacy Hadoop sequence file sources

2022-05-03 Thread Ken Krugler
Hi all,

I’m converting several batch Flink workflows to streaming, with bounded sources.

Some of our sources are reading Hadoop sequence files via 
StreamExecutionEnvironment.createInput(HadoopInputFormat).

The problem is that StreamGraphGenerator.existsUnboundedSource is returning 
true, because the LegacySourceTransformation for this source says it’s 
CONTINUOUS_UNBOUNDED. So the workflow fails to run, because I’ve set the 
execution mode to batch.

The root cause is that StreamExecutionEnvironment.createInput() checks if the 
input format extends FileInputFormat, and only sets up a bounded source if it 
does. HadoopInputFormat doesn’t extend FileInputFormat, so boundedness gets set 
to CONTINUOUS_UNBOUNDED, which is wrong.

This looks like a bug in StreamExecutionEnvironment.createInput(), though not 
sure how best to fix it. Relying on class checks feels brittle.

Regards,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: [statefun] hadoop dependencies and StatefulFunctionsConfigValidator

2022-03-09 Thread Igal Shilman
Hi Fil,
I've replied in the JIRA

Cheers,
Igal.

On Tue, Mar 8, 2022 at 6:08 PM Filip Karnicki 
wrote:

> Hi Roman, Igal (@ below)
>
> Thank you for your answer. I don't think I'll have access to flink's lib
> folder given it's a shared Cloudera cluster. The only thing I could think
> of is to not include com.google.protobuf in the
> classloader.parent-first-patterns.additional setting, and
> including protobuf-java 3.7.1 in the uber jar.
>
> I created a jira for this just now + a discuss thread on the dev group
> https://issues.apache.org/jira/browse/FLINK-26537
>
> Hi @Igal Shilman  , is the plugin solution
> outlined by Roman something that fits in better with Statefun than having
> the creators of uber .jars be responsible for using a statefun-compatible
> protobuf-java?
>
> Kind regards
> Fil
>
> On Tue, 8 Mar 2022 at 14:02, Roman Khachatryan  wrote:
>
>> Hi Filip,
>>
>> Have you tried putting protobuf-java 3.7.1 into the Flink's lib/ folder?
>> Or maybe re-writing the dependencies you mentioned to be loaded as
>> plugins? [1]
>>
>> I don't see any other ways to solve this problem.
>> Probably Chesnay or Seth will suggest a better solution.
>>
>> [1]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/
>>
>>
>> Regards,
>> Roman
>>
>> On Fri, Mar 4, 2022 at 9:54 AM Filip Karnicki 
>> wrote:
>> >
>> > Hi All!
>> >
>> > We're running a statefun uber jar on a shared cloudera flink cluster,
>> the latter of which launches with some ancient protobuf dependencies
>> because of reasons[1].
>> >
>> > Setting the following flink-config settings on the entire cluster
>> >
>> > classloader.parent-first-patterns.additional:
>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>> >
>> > causes these old protobuf dependencies to get loaded over statefun's
>> protobuf-java 3.7.1, and NoSuchMethod exceptions occur.
>> >
>> > We hacked together a version of statefun that doesn't perform the check
>> whether the classloader settings contain the three patterns from above, and
>> as long as our job uses protobouf-java 3.7.1 and the com.google.protobuf
>> pattern is not present in the classloader.parent-first-patterns.additional
>> setting, then all is well.
>> >
>> > Aside from removing old hadoop from the classpath, which may not be
>> possible given that it's a shared cluster, is there anything we can do
>> other than adding a configurable override not to perform the config check
>> in StatefulFunctionsConfigValidator to an upcoming statefun core release?
>> >
>> > Many thanks
>> > Fil
>> >
>> >
>> > [1] We're still trying to find out if it's absolutely necessary to have
>> these on the classpath.
>>
>


Re: [statefun] hadoop dependencies and StatefulFunctionsConfigValidator

2022-03-08 Thread Filip Karnicki
Hi Roman, Igal (@ below)

Thank you for your answer. I don't think I'll have access to flink's lib
folder given it's a shared Cloudera cluster. The only thing I could think
of is to not include com.google.protobuf in the
classloader.parent-first-patterns.additional setting, and
including protobuf-java 3.7.1 in the uber jar.

I created a jira for this just now + a discuss thread on the dev group
https://issues.apache.org/jira/browse/FLINK-26537

Hi @Igal Shilman  , is the plugin solution outlined
by Roman something that fits in better with Statefun than having the
creators of uber .jars be responsible for using a statefun-compatible
protobuf-java?

Kind regards
Fil

On Tue, 8 Mar 2022 at 14:02, Roman Khachatryan  wrote:

> Hi Filip,
>
> Have you tried putting protobuf-java 3.7.1 into the Flink's lib/ folder?
> Or maybe re-writing the dependencies you mentioned to be loaded as
> plugins? [1]
>
> I don't see any other ways to solve this problem.
> Probably Chesnay or Seth will suggest a better solution.
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/
>
>
> Regards,
> Roman
>
> On Fri, Mar 4, 2022 at 9:54 AM Filip Karnicki 
> wrote:
> >
> > Hi All!
> >
> > We're running a statefun uber jar on a shared cloudera flink cluster,
> the latter of which launches with some ancient protobuf dependencies
> because of reasons[1].
> >
> > Setting the following flink-config settings on the entire cluster
> >
> > classloader.parent-first-patterns.additional:
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
> >
> > causes these old protobuf dependencies to get loaded over statefun's
> protobuf-java 3.7.1, and NoSuchMethod exceptions occur.
> >
> > We hacked together a version of statefun that doesn't perform the check
> whether the classloader settings contain the three patterns from above, and
> as long as our job uses protobouf-java 3.7.1 and the com.google.protobuf
> pattern is not present in the classloader.parent-first-patterns.additional
> setting, then all is well.
> >
> > Aside from removing old hadoop from the classpath, which may not be
> possible given that it's a shared cluster, is there anything we can do
> other than adding a configurable override not to perform the config check
> in StatefulFunctionsConfigValidator to an upcoming statefun core release?
> >
> > Many thanks
> > Fil
> >
> >
> > [1] We're still trying to find out if it's absolutely necessary to have
> these on the classpath.
>


Re: [statefun] hadoop dependencies and StatefulFunctionsConfigValidator

2022-03-08 Thread Roman Khachatryan
Hi Filip,

Have you tried putting protobuf-java 3.7.1 into the Flink's lib/ folder?
Or maybe re-writing the dependencies you mentioned to be loaded as plugins? [1]

I don't see any other ways to solve this problem.
Probably Chesnay or Seth will suggest a better solution.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/


Regards,
Roman

On Fri, Mar 4, 2022 at 9:54 AM Filip Karnicki  wrote:
>
> Hi All!
>
> We're running a statefun uber jar on a shared cloudera flink cluster, the 
> latter of which launches with some ancient protobuf dependencies because of 
> reasons[1].
>
> Setting the following flink-config settings on the entire cluster
>
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
> causes these old protobuf dependencies to get loaded over statefun's 
> protobuf-java 3.7.1, and NoSuchMethod exceptions occur.
>
> We hacked together a version of statefun that doesn't perform the check 
> whether the classloader settings contain the three patterns from above, and 
> as long as our job uses protobouf-java 3.7.1 and the com.google.protobuf 
> pattern is not present in the classloader.parent-first-patterns.additional 
> setting, then all is well.
>
> Aside from removing old hadoop from the classpath, which may not be possible 
> given that it's a shared cluster, is there anything we can do other than 
> adding a configurable override not to perform the config check in 
> StatefulFunctionsConfigValidator to an upcoming statefun core release?
>
> Many thanks
> Fil
>
>
> [1] We're still trying to find out if it's absolutely necessary to have these 
> on the classpath.


[statefun] hadoop dependencies and StatefulFunctionsConfigValidator

2022-03-04 Thread Filip Karnicki
Hi All!

We're running a statefun uber jar on a shared cloudera flink cluster,
the latter of which launches with some ancient protobuf dependencies
because of reasons[1].

Setting the following flink-config settings on the entire cluster

classloader.parent-first-patterns.additional:
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf

causes these old protobuf dependencies to get loaded over statefun's
protobuf-java 3.7.1, and NoSuchMethod exceptions occur.

We hacked together a version of statefun that doesn't perform the check
whether the classloader settings contain the three patterns from above, and
as long as our job uses protobouf-java 3.7.1 and the
com.google.protobuf pattern
is not present in the classloader.parent-first-patterns.additional setting,
then all is well.

Aside from removing old hadoop from the classpath, which may not be
possible given that it's a shared cluster, is there anything we can do
other than adding a configurable override not to perform the config check
in StatefulFunctionsConfigValidator to an upcoming statefun core release?

Many thanks
Fil


[1] We're still trying to find out if it's absolutely necessary to have
these on the classpath.


Re: [DISCUSS] Changing the minimal supported version of Hadoop

2022-01-03 Thread David Morávek
As there were no strong objections, we'll proceed with bumping the Hadoop
version to 2.8.5 and removing the safeguards and the CI for any earlier
versions. This will effectively make the Hadoop 2.8.5 the least supported
version in Flink 1.15.

Best,
D.

On Thu, Dec 23, 2021 at 11:03 AM Till Rohrmann  wrote:

> If there are no users strongly objecting to dropping Hadoop support for <
> 2.8, then I am +1 for this since otherwise we won't gain a lot as Xintong
> said.
>
> Cheers,
> Till
>
> On Wed, Dec 22, 2021 at 10:33 AM David Morávek  wrote:
>
> > Agreed, if we drop the CI for lower versions, there is actually no point
> > of having safeguards as we can't really test for them.
> >
> > Maybe one more thought (it's more of a feeling), I feel that users
> running
> > really old Hadoop versions are usually slower to adopt (they most likely
> > use what the current HDP / CDH version they use offers) and they are less
> > likely to use Flink 1.15 any time soon, but I don't have any strong data
> to
> > support this.
> >
> > D.
> >
>


Re: [DISCUSS] Changing the minimal supported version of Hadoop

2021-12-23 Thread Till Rohrmann
If there are no users strongly objecting to dropping Hadoop support for <
2.8, then I am +1 for this since otherwise we won't gain a lot as Xintong
said.

Cheers,
Till

On Wed, Dec 22, 2021 at 10:33 AM David Morávek  wrote:

> Agreed, if we drop the CI for lower versions, there is actually no point
> of having safeguards as we can't really test for them.
>
> Maybe one more thought (it's more of a feeling), I feel that users running
> really old Hadoop versions are usually slower to adopt (they most likely
> use what the current HDP / CDH version they use offers) and they are less
> likely to use Flink 1.15 any time soon, but I don't have any strong data to
> support this.
>
> D.
>


Re: [DISCUSS] Changing the minimal supported version of Hadoop

2021-12-22 Thread David Morávek
Agreed, if we drop the CI for lower versions, there is actually no point of
having safeguards as we can't really test for them.

Maybe one more thought (it's more of a feeling), I feel that users running
really old Hadoop versions are usually slower to adopt (they most likely
use what the current HDP / CDH version they use offers) and they are less
likely to use Flink 1.15 any time soon, but I don't have any strong data to
support this.

D.


Re: [DISCUSS] Changing the minimal supported version of Hadoop

2021-12-21 Thread Xintong Song
Sorry to join the discussion late.

+1 for dropping support for hadoop versions < 2.8 from my side.

TBH, warping the reflection based logic with safeguards sounds a bit
neither fish nor fowl to me. It weakens the major benefits that we look for
by dropping support for early versions.
- The codebase is simplified, but not significantly. We still have the
complexity of understanding which APIs may not exist in early versions.
- Without CI, we provide no guarantee that Flink will still work with early
hadoop versions. Or otherwise we fail to simplify the CI.

I'd suggest to say we no longer support hadoop versions < 2.8 at all. And
if that is not permitted by our users, we may consider to keep the codebase
as is and wait for a bit longer.

WDYT?

Thank you~

Xintong Song


[1]
https://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-common/Compatibility.html#Wire_compatibility

On Wed, Dec 22, 2021 at 12:52 AM David Morávek  wrote:

> CC user@f.a.o
>
> Is anyone aware of something that blocks us from doing the upgrade?
>
> D.
>
> On Tue, Dec 21, 2021 at 5:50 PM David Morávek 
> wrote:
>
>> Hi Martijn,
>>
>> from person experience, most Hadoop users are lagging behind the release
>> lines by a lot, because upgrading a Hadoop cluster is not really a simply
>> task to achieve. I think for now, we can stay a bit conservative, nothing
>> blocks us for using 2.8.5 as we don't use any "newer" APIs in the code.
>>
>> As for Till's concern, we can still wrap the reflection based logic, to
>> be skipped in case of "NoClassDefFound" instead of "ClassNotFound" as we do
>> now.
>>
>> D.
>>
>>
>> On Tue, Dec 14, 2021 at 5:23 PM Martijn Visser 
>> wrote:
>>
>>> Hi David,
>>>
>>> Thanks for bringing this up for discussion! Given that Hadoop 2.8 is
>>> considered EOL, shouldn't we bump the version to Hadoop 2.10? [1]
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> [1]
>>>
>>> https://cwiki.apache.org/confluence/display/HADOOP/Hadoop+Active+Release+Lines
>>>
>>> On Tue, 14 Dec 2021 at 10:28, Till Rohrmann 
>>> wrote:
>>>
>>> > Hi David,
>>> >
>>> > I think we haven't updated our Hadoop dependencies in a long time.
>>> Hence,
>>> > it is probably time to do so. So +1 for upgrading to the latest patch
>>> > release.
>>> >
>>> > If newer 2.x Hadoop versions are compatible with 2.y with x >= y, then
>>> I
>>> > don't see a problem with dropping support for pre-bundled Hadoop
>>> versions <
>>> > 2.8. This could indeed help us decrease our build matrix a bit and,
>>> thus,
>>> > saving some build time.
>>> >
>>> > Concerning simplifying our code base to get rid of reflection logic
>>> etc. we
>>> > still might have to add a safeguard for features that are not
>>> supported by
>>> > earlier versions. According to the docs
>>> >
>>> > > YARN applications that attempt to use new APIs (including new fields
>>> in
>>> > data structures) that have not yet been deployed to the cluster can
>>> expect
>>> > link exceptions
>>> >
>>> > we can see link exceptions. We could get around this by saying that
>>> Flink
>>> > no longer supports Hadoop < 2.8. But this should be checked with our
>>> users
>>> > on the user ML at least.
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Tue, Dec 14, 2021 at 9:25 AM David Morávek  wrote:
>>> >
>>> > > Hi,
>>> > >
>>> > > I'd like to start a discussion about upgrading a minimal Hadoop
>>> version
>>> > > that Flink supports.
>>> > >
>>> > > Even though the default value for `hadoop.version` property is set to
>>> > > 2.8.3, we're still ensuring both runtime and compile compatibility
>>> with
>>> > > Hadoop 2.4.x with the scheduled pipeline[1].
>>> > >
>>> > > Here is list of dates of the latest releases for each minor version
>>> up to
>>> > > 2.8.x
>>> > >
>>> > > - Hadoop 2.4.1: Last commit on 6/30/2014
>>> > > - Hadoop 2.5.2: Last commit on 11/15/2014
>>> > > - Hadoop 2.6.5: Last commit on 10/11/2016
>>> > > - Hadoop 2.7.7: Last commit on 7/18/2018
>>> > > - Hadoop 2.8.5: Last commit on 

Re: [DISCUSS] Changing the minimal supported version of Hadoop

2021-12-21 Thread David Morávek
CC user@f.a.o

Is anyone aware of something that blocks us from doing the upgrade?

D.

On Tue, Dec 21, 2021 at 5:50 PM David Morávek 
wrote:

> Hi Martijn,
>
> from person experience, most Hadoop users are lagging behind the release
> lines by a lot, because upgrading a Hadoop cluster is not really a simply
> task to achieve. I think for now, we can stay a bit conservative, nothing
> blocks us for using 2.8.5 as we don't use any "newer" APIs in the code.
>
> As for Till's concern, we can still wrap the reflection based logic, to be
> skipped in case of "NoClassDefFound" instead of "ClassNotFound" as we do
> now.
>
> D.
>
>
> On Tue, Dec 14, 2021 at 5:23 PM Martijn Visser 
> wrote:
>
>> Hi David,
>>
>> Thanks for bringing this up for discussion! Given that Hadoop 2.8 is
>> considered EOL, shouldn't we bump the version to Hadoop 2.10? [1]
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/HADOOP/Hadoop+Active+Release+Lines
>>
>> On Tue, 14 Dec 2021 at 10:28, Till Rohrmann  wrote:
>>
>> > Hi David,
>> >
>> > I think we haven't updated our Hadoop dependencies in a long time.
>> Hence,
>> > it is probably time to do so. So +1 for upgrading to the latest patch
>> > release.
>> >
>> > If newer 2.x Hadoop versions are compatible with 2.y with x >= y, then I
>> > don't see a problem with dropping support for pre-bundled Hadoop
>> versions <
>> > 2.8. This could indeed help us decrease our build matrix a bit and,
>> thus,
>> > saving some build time.
>> >
>> > Concerning simplifying our code base to get rid of reflection logic
>> etc. we
>> > still might have to add a safeguard for features that are not supported
>> by
>> > earlier versions. According to the docs
>> >
>> > > YARN applications that attempt to use new APIs (including new fields
>> in
>> > data structures) that have not yet been deployed to the cluster can
>> expect
>> > link exceptions
>> >
>> > we can see link exceptions. We could get around this by saying that
>> Flink
>> > no longer supports Hadoop < 2.8. But this should be checked with our
>> users
>> > on the user ML at least.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Tue, Dec 14, 2021 at 9:25 AM David Morávek  wrote:
>> >
>> > > Hi,
>> > >
>> > > I'd like to start a discussion about upgrading a minimal Hadoop
>> version
>> > > that Flink supports.
>> > >
>> > > Even though the default value for `hadoop.version` property is set to
>> > > 2.8.3, we're still ensuring both runtime and compile compatibility
>> with
>> > > Hadoop 2.4.x with the scheduled pipeline[1].
>> > >
>> > > Here is list of dates of the latest releases for each minor version
>> up to
>> > > 2.8.x
>> > >
>> > > - Hadoop 2.4.1: Last commit on 6/30/2014
>> > > - Hadoop 2.5.2: Last commit on 11/15/2014
>> > > - Hadoop 2.6.5: Last commit on 10/11/2016
>> > > - Hadoop 2.7.7: Last commit on 7/18/2018
>> > > - Hadoop 2.8.5: Last commit on 9/8/2018
>> > >
>> > > Since then there were two more minor releases in 2.x branch and four
>> more
>> > > minor releases in 3.x branch.
>> > >
>> > > Supporting the older version involves reflection-based "hacks" for
>> > > supporting multiple versions.
>> > >
>> > > My proposal would be changing the minimum supported version *to
>> 2.8.5*.
>> > > This should simplify the hadoop related codebase and simplify the CI
>> > build
>> > > infrastructure as we won't have to test for the older versions.
>> > >
>> > > Please note that this only involves a minimal *client side*
>> > compatibility.
>> > > The wire protocol should remain compatible with earlier versions [2],
>> so
>> > we
>> > > should be able to talk with any servers in 2.x major branch.
>> > >
>> > > One small note for the 2.8.x branch, some of the classes we need are
>> only
>> > > available in 2.8.4 version and above, but I'm not sure we should take
>> an
>> > > eventual need for upgrading a patch version into consideration here,
>> > > because both 2.8.4 and 2.8.5 are pretty old.
>> > >
>> > > WDYT, is it already time to upgrade? Looking forward for any thoughts
>> on
>> > > the topic!
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://github.com/apache/flink/blob/release-1.14.0/tools/azure-pipelines/build-apache-repo.yml#L123
>> > > [2]
>> > >
>> > >
>> >
>> https://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-common/Compatibility.html#Wire_compatibility
>> > >
>> > > Best,
>> > > D.
>> > >
>> >
>>
>


Re: Passing arbitrary Hadoop s3a properties from FileSystem SQL Connector options

2021-12-15 Thread Arvid Heise
Hi Timothy,

The issue would require a refactor FileSystems abstraction to allow
multiple FileSystems objects of the same FileSystem type being configured
differently. While this would improve the code quality and enable such use
cases, I currently have no capacity to work on it or guide it.

If you are interested in working on it, I can try to find someone to
shepherd.

On Mon, Dec 13, 2021 at 7:13 PM Timothy James  wrote:

> Thank you Timo.  Hi Arvid!
>
> I note that that ticket proposes two alternatives for solution.  The first
>
> > Either we allow a properties map similar to Kafka or Kinesis properties
> to our connectors.
>
> seems to solve our problem.  The second, much more detailed, appears
> unrelated to our needs:
>
> > Or something like:
> > Management of two properties related S3 Object management:
> > ...
>
> The ticket is unassigned and has been open for more than a year.  It looks
> like you increased the ticket priority, thank you.
>
> Tim
>
> On Mon, Dec 13, 2021 at 6:52 AM Timo Walther  wrote:
>
>> Hi Timothy,
>>
>> unfortunetaly, this is not supported yet. However, the effort will be
>> tracked under the following ticket:
>>
>> https://issues.apache.org/jira/browse/FLINK-19589
>>
>> I will loop-in Arvid (in CC) which might help you in contributing the
>> missing functioniality.
>>
>> Regards,
>> Timo
>>
>>
>> On 10.12.21 23:48, Timothy James wrote:
>> > Hi,
>> >
>> > The Hadoop s3a library itself supports some properties we need, but the
>> > "FileSystem SQL Connector" (via FileSystemTableFactory) does not pass
>> > connector options for these to the "Hadoop/Presto S3 File Systems
>> > plugins" (via S3FileSystemFactory).
>> >
>> > Instead, only Job-global Flink config values are passed to Hadoop s3a.
>> > That won't work for us: we need to vary these values per Flink SQL
>> > table, and not override our config for other use of S3 (such as Flink
>> > checkpointing).
>> >
>> > Contrast this with the Kafka connector, which supports an analogous
>> > "properties.*" prefixed pass-through mechanism, and the Kinesis
>> > connector, which supports all the specific properties we would need out
>> > of the box.
>> >
>> > Our current intent is to alter FileSystemTableFactory to follow the
>> > "properties.*" approach used by the Kafka connector.
>> >
>> > *** ➡️ Our questions for you: ⬅️
>> > - Know of anything like this? Anybody solved this?
>> > - Know of anything that's going to break this approach?
>> > - What are we missing?
>> >
>> > For context, our particular use case requires options like:
>> > - fs.s3a.assumed.role.arn
>> > - fs.s3a.aws.credentials.provider, (or some other mechanism to pass
>> > externalId)
>> >
>> > We imagine there would be other use cases for this, and if we build it
>> > ourselves there's the possibility of contributing it to the Flink repo
>> > for everybody.
>> >
>> > Relevant documentation:
>> > -
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/
>> > <
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/
>> >
>> > -
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>> > <
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>> >
>> > -
>> >
>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html
>> > <
>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html
>> >
>> > -
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties
>> > <
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties
>> >
>> > -
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid
>> > <
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid
>> >
>> >
>> > Thank you!
>> >
>> > Tim James
>> > Decodable.co
>> >
>>
>>


Re: Passing arbitrary Hadoop s3a properties from FileSystem SQL Connector options

2021-12-13 Thread Timothy James
Thank you Timo.  Hi Arvid!

I note that that ticket proposes two alternatives for solution.  The first

> Either we allow a properties map similar to Kafka or Kinesis properties
to our connectors.

seems to solve our problem.  The second, much more detailed, appears
unrelated to our needs:

> Or something like:
> Management of two properties related S3 Object management:
> ...

The ticket is unassigned and has been open for more than a year.  It looks
like you increased the ticket priority, thank you.

Tim

On Mon, Dec 13, 2021 at 6:52 AM Timo Walther  wrote:

> Hi Timothy,
>
> unfortunetaly, this is not supported yet. However, the effort will be
> tracked under the following ticket:
>
> https://issues.apache.org/jira/browse/FLINK-19589
>
> I will loop-in Arvid (in CC) which might help you in contributing the
> missing functioniality.
>
> Regards,
> Timo
>
>
> On 10.12.21 23:48, Timothy James wrote:
> > Hi,
> >
> > The Hadoop s3a library itself supports some properties we need, but the
> > "FileSystem SQL Connector" (via FileSystemTableFactory) does not pass
> > connector options for these to the "Hadoop/Presto S3 File Systems
> > plugins" (via S3FileSystemFactory).
> >
> > Instead, only Job-global Flink config values are passed to Hadoop s3a.
> > That won't work for us: we need to vary these values per Flink SQL
> > table, and not override our config for other use of S3 (such as Flink
> > checkpointing).
> >
> > Contrast this with the Kafka connector, which supports an analogous
> > "properties.*" prefixed pass-through mechanism, and the Kinesis
> > connector, which supports all the specific properties we would need out
> > of the box.
> >
> > Our current intent is to alter FileSystemTableFactory to follow the
> > "properties.*" approach used by the Kafka connector.
> >
> > *** ➡️ Our questions for you: ⬅️
> > - Know of anything like this? Anybody solved this?
> > - Know of anything that's going to break this approach?
> > - What are we missing?
> >
> > For context, our particular use case requires options like:
> > - fs.s3a.assumed.role.arn
> > - fs.s3a.aws.credentials.provider, (or some other mechanism to pass
> > externalId)
> >
> > We imagine there would be other use cases for this, and if we build it
> > ourselves there's the possibility of contributing it to the Flink repo
> > for everybody.
> >
> > Relevant documentation:
> > -
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/
> >
> > -
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
> > <
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
> >
> > -
> >
> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html
> > <
> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html
> >
> > -
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties
> >
> > -
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid
> >
> >
> > Thank you!
> >
> > Tim James
> > Decodable.co
> >
>
>


Re: Passing arbitrary Hadoop s3a properties from FileSystem SQL Connector options

2021-12-13 Thread Timo Walther

Hi Timothy,

unfortunetaly, this is not supported yet. However, the effort will be 
tracked under the following ticket:


https://issues.apache.org/jira/browse/FLINK-19589

I will loop-in Arvid (in CC) which might help you in contributing the 
missing functioniality.


Regards,
Timo


On 10.12.21 23:48, Timothy James wrote:

Hi,

The Hadoop s3a library itself supports some properties we need, but the 
"FileSystem SQL Connector" (via FileSystemTableFactory) does not pass 
connector options for these to the "Hadoop/Presto S3 File Systems 
plugins" (via S3FileSystemFactory).


Instead, only Job-global Flink config values are passed to Hadoop s3a.  
That won't work for us: we need to vary these values per Flink SQL 
table, and not override our config for other use of S3 (such as Flink 
checkpointing).


Contrast this with the Kafka connector, which supports an analogous 
"properties.*" prefixed pass-through mechanism, and the Kinesis 
connector, which supports all the specific properties we would need out 
of the box.


Our current intent is to alter FileSystemTableFactory to follow the 
"properties.*" approach used by the Kafka connector.


*** ➡️ Our questions for you: ⬅️
- Know of anything like this? Anybody solved this?
- Know of anything that's going to break this approach?
- What are we missing?

For context, our particular use case requires options like:
- fs.s3a.assumed.role.arn
- fs.s3a.aws.credentials.provider, (or some other mechanism to pass 
externalId)


We imagine there would be other use cases for this, and if we build it 
ourselves there's the possibility of contributing it to the Flink repo 
for everybody.


Relevant documentation:
- 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/ 
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/>
- 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins 
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins>
- 
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html 
<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html>
- 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties 
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties>
- 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid 
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid>


Thank you!

Tim James
Decodable.co





Passing arbitrary Hadoop s3a properties from FileSystem SQL Connector options

2021-12-10 Thread Timothy James
Hi,

The Hadoop s3a library itself supports some properties we need, but the
"FileSystem SQL Connector" (via FileSystemTableFactory) does not pass
connector options for these to the "Hadoop/Presto S3 File Systems plugins"
(via S3FileSystemFactory).

Instead, only Job-global Flink config values are passed to Hadoop s3a.
That won't work for us: we need to vary these values per Flink SQL table,
and not override our config for other use of S3 (such as Flink
checkpointing).

Contrast this with the Kafka connector, which supports an analogous
"properties.*" prefixed pass-through mechanism, and the Kinesis connector,
which supports all the specific properties we would need out of the box.

Our current intent is to alter FileSystemTableFactory to follow the
"properties.*" approach used by the Kafka connector.

*** ➡️ Our questions for you: ⬅️
- Know of anything like this? Anybody solved this?
- Know of anything that's going to break this approach?
- What are we missing?

For context, our particular use case requires options like:
- fs.s3a.assumed.role.arn
- fs.s3a.aws.credentials.provider, (or some other mechanism to pass
externalId)

We imagine there would be other use cases for this, and if we build it
ourselves there's the possibility of contributing it to the Flink repo for
everybody.

Relevant documentation:
-
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/
-
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
-
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html
-
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties
-
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid

Thank you!

Tim James
Decodable.co


Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-12-08 Thread Natu Lauchande
Hey Ingo,

Thanks for the suggestion. It's definitely an issue with the Parquet
connector, when we try with the CSV or Blackhole connector it's all fine.

I will be trying this approach and report back.

Thanks,
Natu

On Wed, Dec 8, 2021 at 7:02 PM Ingo Bürk  wrote:

> Hi Natu,
>
> Something you could try is removing the packaged parquet format and
> defining a custom format[1]. For this custom format you can then fix the
> dependencies by packaging all of the following into the format:
>
> * flink-sql-parquet
> * flink-shaded-hadoop-2-uber
> * hadoop-aws
> * aws-java-sdk-bundle
> * guava
>
> This isn't entirely straight-forward, unfortunately, and I haven't
> verified it. However, with Ververica Platform 2.6, to be released shortly
> after Flink 1.15, it should also work again.
>
> [1]
> https://docs.ververica.com/user_guide/sql_development/connectors.html#custom-connectors-and-formats
>
>
> Best
> Ingo
>
> On Tue, Dec 7, 2021 at 6:23 AM Natu Lauchande 
> wrote:
>
>> Hey Timo and Flink community,
>>
>> I wonder if there is a fix for this issue. The last time I rollbacked to
>> version 12 of Flink and downgraded Ververica.
>>
>> I am really keen to leverage the new features on the latest versions of
>> Ververica 2.5+ , i have tried a myriad of tricks suggested ( example :
>> building the image with hadoop-client libraries) :
>>
>> java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
>> at java.lang.Class.getDeclaredConstructors0(Native Method)
>> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
>> at java.lang.Class.getDeclaredConstructors(Class.java:2020)
>> at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass
>> .java:1961)
>> at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
>> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275)
>> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass
>> .java:272)
>> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:
>> 2003)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:
>> 1850)
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2160)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2405)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2329)
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2187)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2405)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2329)
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2187)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2405)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2329)
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2187)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2405)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2329)
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2187)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2405)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2329)
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2187)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>> InstantiationUtil.java:615)
>> at org.a

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-12-08 Thread Ingo Bürk
Hi Natu,

Something you could try is removing the packaged parquet format and
defining a custom format[1]. For this custom format you can then fix the
dependencies by packaging all of the following into the format:

* flink-sql-parquet
* flink-shaded-hadoop-2-uber
* hadoop-aws
* aws-java-sdk-bundle
* guava

This isn't entirely straight-forward, unfortunately, and I haven't verified
it. However, with Ververica Platform 2.6, to be released shortly after
Flink 1.15, it should also work again.

[1]
https://docs.ververica.com/user_guide/sql_development/connectors.html#custom-connectors-and-formats


Best
Ingo

On Tue, Dec 7, 2021 at 6:23 AM Natu Lauchande  wrote:

> Hey Timo and Flink community,
>
> I wonder if there is a fix for this issue. The last time I rollbacked to
> version 12 of Flink and downgraded Ververica.
>
> I am really keen to leverage the new features on the latest versions of
> Ververica 2.5+ , i have tried a myriad of tricks suggested ( example :
> building the image with hadoop-client libraries) :
>
> java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
> at java.lang.Class.getDeclaredConstructors(Class.java:2020)
> at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass
> .java:1961)
> at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass
> .java:272)
> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:
> 2003)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850
> )
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2160)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2329)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2329)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2329)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2329)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2329)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:615)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:600)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:587)
> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> InstantiationUtil.java:541)
> at org.apache.flink.streaming.api.graph.StreamConfig
> .getStreamOperatorFactory(StreamConfig.java:322)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
> OperatorChain.java:159)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(
> StreamTask.java:551)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .runWithCleanUpOnFail(StreamTa

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-12-06 Thread Natu Lauchande
Hey Timo and Flink community,

I wonder if there is a fix for this issue. The last time I rollbacked to
version 12 of Flink and downgraded Ververica.

I am really keen to leverage the new features on the latest versions of
Ververica 2.5+ , i have tried a myriad of tricks suggested ( example :
building the image with hadoop-client libraries) :

java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getDeclaredConstructors(Class.java:2020)
at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:
1961)
at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:
272)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:
2003)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2160)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:615)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:600)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:587)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
InstantiationUtil.java:541)
at org.apache.flink.streaming.api.graph.StreamConfig
.getStreamOperatorFactory(StreamConfig.java:322)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
OperatorChain.java:159)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(
StreamTask.java:551)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(
StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.
Configuration
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.util.FlinkUserCodeClassLoader
.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at org.apache.flink.util.ChildFirstClassLoader
.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(
FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 48 more

This error occurs when writing to StreamFileWriting on S3 in Parquet
format.


Thanks,
Natu

On Thu, Jul 22, 2021 at 3:53 PM Timo Walther  wrote:

> Thanks, this should definit

Re: Replacing S3 Client in Hadoop plugin

2021-11-23 Thread Martijn Visser
Hi Tamir,

Thanks for providing the information. I don't know of a current solution
right now, perhaps some other user has an idea, but I do find your input
valuable for future improvements with regards to the S3 Client in Hadoop.

Best regards,

Martijn

On Fri, 19 Nov 2021 at 09:21, Tamir Sagi 
wrote:

> Hey Martijn,
>
> sorry for late respond.
>
> We wanted to replace the default client with our custom S3 client and not
> use the AmazonS3Client provided by the plugin.
>
> We used Flink-s3-fs-hadoop v1.12.2 and for our needs we had to upgrade to
> v1.14.0 [1].
>
> AmazonS3 client factory is initialized[2] - if the property
> "fs.s3a.s3.client.factory.impl" [3] is not provided the default factory is
> created [4] which provides AmazonS3Client - which does not support what we
> need.
> I know that both the property and the factory interface are annotated with
>
> @InterfaceAudience.Private
> @InterfaceStability.Unstable
> from very early version.
>
> but we found this solution cleaner than extend the whole class and
> override the #setAmazonS3Client method.
>
> Bottom line, all we had to do was to create our own implementation for
> S3ClientFactory interface [5]
> and add to flink-conf.yaml :  s3.s3.client.factory.impl:  canonical name> .
> place both the plugin and our artifact(with Factory and client impl) under
> ${FLINK_HOME}/plugins/s3
>
> One important note:  Flink-s3-fs-hadoop plugin includes the whole
> com.amazonaws.s3 source code, to avoid plugin class loader issues, we
> needed to remove the aws-s3-java-sdk dependency and provide the plugin
> dependency with scope "provided".
> If the jobs needs to do some work with S3,then shading com.amazonaws was
> also necessary.
>
> [1]
> https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop/1.14.0
>
> [2]
> https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L264-L266
>
> [3]
> https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java#L366-L369
>
> [4]
> https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java#L66
>
> [5]
> https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
>
> Best,
> Tamir.
>
> --
> *From:* Martijn Visser 
> *Sent:* Wednesday, October 13, 2021 8:28 PM
> *To:* Tamir Sagi ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Replacing S3 Client in Hadoop plugin
>
>
> *EXTERNAL EMAIL*
>
>
> Hi,
>
> Could you elaborate on why you would like to replace the S3 client?
>
> Best regards,
>
> Martijn
>
> On Wed, 13 Oct 2021 at 17:18, Tamir Sagi 
> wrote:
>
> I found the dependency
>
> 
> org.apache.hadoop
> hadoop-aws
> 3.3.1
> 
>
> apparently its possible, there is a method
> setAmazonS3Client
>
> I think I found the solution.
>
> Thanks.
>
> Tamir.
>
> --
> *From:* Tamir Sagi 
> *Sent:* Wednesday, October 13, 2021 5:44 PM
> *To:* user@flink.apache.org 
> *Subject:* Replacing S3 Client in Hadoop plugin
>
> Hey community.
>
> I would like to know if there is any way to replace the S3 client in
> Hadoop plugin[1] to a custom client(AmazonS3).
>
> I did notice that Hadoop plugin supports replacing the implementation of
> S3AFileSystem using
> "fs.s3a.impl" (in flink-conf.yaml it will be "s3.impl") but not the client
> itself [2]
>
> 
>   fs.s3a.impl
>   org.apache.hadoop.fs.s3a.S3AFileSystem
>   The implementation class of the S3A Filesystem
> 
>
> I delved into Hadoop plugin source code [3] , the Client itself is of type
> AmazonS3Client and cannot be replaced (for example) with a client of
> type AmazonS3EncryptionV2.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
> Amazon S3 | Apache Flink
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins>
> Entropy injection for S3 file systems # The bundled S3 file systems
> (flink-s3-fs-presto and flink-s3-fs-hadoop) support entropy
> injection.Entropy injection is a technique to improve the scalability of
> AWS S3 buckets through adding some random characters near the beginning of
> the key.
> ci.apache.org
>
> [2]
> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/inde

flink1.12 请教下如何配置多hadoop参数,s3使用问题

2021-11-22 Thread RS
hi,


环境:
1. flink-1.12,版本可以升级
2. flink-conf中配置了env.hadoop.conf.dir,路径下有hdfs集群的core-site.xml和hdfs-site.xml, 
state.backend保存在该HDFS上
3. flink的部署模式是K8S+session


需求:
需要从一个s3协议的分布式文件系统中读取文件,处理完写到mysql中


问题:
s3配置采用hadoop的配置方式,保存为一个新的core-site.xml文件,参考的 
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A


按照官网说明文档中,需要 修改hadoop的环境变量,但是就和以前的core-site.xml冲突了,无法同时配置2个hadoop路径
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/


或者 在flink-conf.yaml中添加一堆s3配置,这样又写死了,再新增一个s3集群的时候如何处理?


所以请教下如何解决这类问题(可以修改代码)?如何配置多个hadoop配置(比如从第一个文件系统(s3协议)读数据,写到第二个文件系统中(s3协议))?





Replacing S3 Client in Hadoop plugin

2021-11-19 Thread Tamir Sagi
Hey Martijn,

sorry for late respond.

We wanted to replace the default client with our custom S3 client and not use 
the AmazonS3Client provided by the plugin.

We used Flink-s3-fs-hadoop v1.12.2 and for our needs we had to upgrade to 
v1.14.0 [1].

AmazonS3 client factory is initialized[2] - if the property 
"fs.s3a.s3.client.factory.impl" [3] is not provided the default factory is 
created [4] which provides AmazonS3Client - which does not support what we need.
I know that both the property and the factory interface are annotated with

@InterfaceAudience.Private
@InterfaceStability.Unstable
from very early version.

but we found this solution cleaner than extend the whole class and override the 
#setAmazonS3Client method.

Bottom line, all we had to do was to create our own implementation for 
S3ClientFactory interface [5]
and add to flink-conf.yaml :  s3.s3.client.factory.impl:  .
place both the plugin and our artifact(with Factory and client impl) under 
${FLINK_HOME}/plugins/s3

One important note:  Flink-s3-fs-hadoop plugin includes the whole 
com.amazonaws.s3 source code, to avoid plugin class loader issues, we needed to 
remove the aws-s3-java-sdk dependency and provide the plugin dependency with 
scope "provided".
If the jobs needs to do some work with S3,then shading com.amazonaws was also 
necessary.

[1] 
https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop/1.14.0

[2] 
https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L264-L266

[3] 
https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java#L366-L369

[4] 
https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java#L66

[5] 
https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java

Best,
Tamir.


From: Martijn Visser 
Sent: Wednesday, October 13, 2021 8:28 PM
To: Tamir Sagi ; user@flink.apache.org 

Subject: Re: Replacing S3 Client in Hadoop plugin


EXTERNAL EMAIL


Hi,

Could you elaborate on why you would like to replace the S3 client?

Best regards,

Martijn

On Wed, 13 Oct 2021 at 17:18, Tamir Sagi 
mailto:tamir.s...@niceactimize.com>> wrote:
I found the dependency


org.apache.hadoop
hadoop-aws
3.3.1


apparently its possible, there is a method
setAmazonS3Client

I think I found the solution.

Thanks.

Tamir.


From: Tamir Sagi 
mailto:tamir.s...@niceactimize.com>>
Sent: Wednesday, October 13, 2021 5:44 PM
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Replacing S3 Client in Hadoop plugin

Hey community.

I would like to know if there is any way to replace the S3 client in Hadoop 
plugin[1] to a custom client(AmazonS3).

I did notice that Hadoop plugin supports replacing the implementation of 
S3AFileSystem using
"fs.s3a.impl" (in flink-conf.yaml it will be "s3.impl") but not the client 
itself [2]

  fs.s3a.impl
  org.apache.hadoop.fs.s3a.S3AFileSystem
  The implementation class of the S3A Filesystem

I delved into Hadoop plugin source code [3] , the Client itself is of type 
AmazonS3Client and cannot be replaced (for example) with a client of type 
AmazonS3EncryptionV2.


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
Amazon S3 | Apache 
Flink<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins>
Entropy injection for S3 file systems # The bundled S3 file systems 
(flink-s3-fs-presto and flink-s3-fs-hadoop) support entropy injection.Entropy 
injection is a technique to improve the scalability of AWS S3 buckets through 
adding some random characters near the beginning of the key.
ci.apache.org<http://ci.apache.org>

[2] 
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A>
[3] 
https://github.com/apache/hadoop/blob/master/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Hadoop-AWS module: Integration with Amazon Web Services - Apache 
Hadoop<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=property%3E%0A%0A%3Cproperty%3E%0A%20%20%3Cname%3E-,fs.s3a.impl,-%3C/name%3E%0A%20%20%3Cvalue%3Eorg>
Overview. Apache Hadoop’s hadoop-aws module provides support for AWS 
integration. applications to easily use this support.. To include the S3A 
client in Apache Hadoop’s default classpath: Make sure 
thatHADOOP_OPTIONAL_TOOLS in hadoop-env.sh includes hadoop-aws in its list of 

Re: Replacing S3 Client in Hadoop plugin

2021-10-13 Thread Martijn Visser
Hi,

Could you elaborate on why you would like to replace the S3 client?

Best regards,

Martijn

On Wed, 13 Oct 2021 at 17:18, Tamir Sagi 
wrote:

> I found the dependency
>
> 
> org.apache.hadoop
> hadoop-aws
> 3.3.1
> 
>
> apparently its possible, there is a method
> setAmazonS3Client
>
> I think I found the solution.
>
> Thanks.
>
> Tamir.
>
> --
> *From:* Tamir Sagi 
> *Sent:* Wednesday, October 13, 2021 5:44 PM
> *To:* user@flink.apache.org 
> *Subject:* Replacing S3 Client in Hadoop plugin
>
> Hey community.
>
> I would like to know if there is any way to replace the S3 client in
> Hadoop plugin[1] to a custom client(AmazonS3).
>
> I did notice that Hadoop plugin supports replacing the implementation of
> S3AFileSystem using
> "fs.s3a.impl" (in flink-conf.yaml it will be "s3.impl") but not the client
> itself [2]
>
> 
>   fs.s3a.impl
>   org.apache.hadoop.fs.s3a.S3AFileSystem
>   The implementation class of the S3A Filesystem
> 
>
> I delved into Hadoop plugin source code [3] , the Client itself is of type
> AmazonS3Client and cannot be replaced (for example) with a client of
> type AmazonS3EncryptionV2.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
> Amazon S3 | Apache Flink
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins>
> Entropy injection for S3 file systems # The bundled S3 file systems
> (flink-s3-fs-presto and flink-s3-fs-hadoop) support entropy
> injection.Entropy injection is a technique to improve the scalability of
> AWS S3 buckets through adding some random characters near the beginning of
> the key.
> ci.apache.org
>
> [2]
> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html
> <https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A>
> [3]
> https://github.com/apache/hadoop/blob/master/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
>
> Hadoop-AWS module: Integration with Amazon Web Services - Apache Hadoop
> <https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=property%3E%0A%0A%3Cproperty%3E%0A%20%20%3Cname%3E-,fs.s3a.impl,-%3C/name%3E%0A%20%20%3Cvalue%3Eorg>
> Overview. Apache Hadoop’s hadoop-aws module provides support for AWS
> integration. applications to easily use this support.. To include the S3A
> client in Apache Hadoop’s default classpath: Make sure
> thatHADOOP_OPTIONAL_TOOLS in hadoop-env.sh includes hadoop-aws in its list
> of optional modules to add in the classpath.. For client side interaction,
> you can declare that relevant JARs must be ...
> hadoop.apache.org
> Thank you,
>
> Best,
> Tamir.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


Re: Replacing S3 Client in Hadoop plugin

2021-10-13 Thread Tamir Sagi
I found the dependency


org.apache.hadoop
hadoop-aws
3.3.1


apparently its possible, there is a method
setAmazonS3Client

I think I found the solution.

Thanks.

Tamir.



From: Tamir Sagi 
Sent: Wednesday, October 13, 2021 5:44 PM
To: user@flink.apache.org 
Subject: Replacing S3 Client in Hadoop plugin

Hey community.

I would like to know if there is any way to replace the S3 client in Hadoop 
plugin[1] to a custom client(AmazonS3).

I did notice that Hadoop plugin supports replacing the implementation of 
S3AFileSystem using
"fs.s3a.impl" (in flink-conf.yaml it will be "s3.impl") but not the client 
itself [2]

  fs.s3a.impl
  org.apache.hadoop.fs.s3a.S3AFileSystem
  The implementation class of the S3A Filesystem

I delved into Hadoop plugin source code [3] , the Client itself is of type 
AmazonS3Client and cannot be replaced (for example) with a client of type 
AmazonS3EncryptionV2.


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
Amazon S3 | Apache 
Flink<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins>
Entropy injection for S3 file systems # The bundled S3 file systems 
(flink-s3-fs-presto and flink-s3-fs-hadoop) support entropy injection.Entropy 
injection is a technique to improve the scalability of AWS S3 buckets through 
adding some random characters near the beginning of the key.
ci.apache.org

[2] 
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A>
[3] 
https://github.com/apache/hadoop/blob/master/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Hadoop-AWS module: Integration with Amazon Web Services - Apache 
Hadoop<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=property%3E%0A%0A%3Cproperty%3E%0A%20%20%3Cname%3E-,fs.s3a.impl,-%3C/name%3E%0A%20%20%3Cvalue%3Eorg>
Overview. Apache Hadoop’s hadoop-aws module provides support for AWS 
integration. applications to easily use this support.. To include the S3A 
client in Apache Hadoop’s default classpath: Make sure 
thatHADOOP_OPTIONAL_TOOLS in hadoop-env.sh includes hadoop-aws in its list of 
optional modules to add in the classpath.. For client side interaction, you can 
declare that relevant JARs must be ...
hadoop.apache.org
Thank you,

Best,
Tamir.



Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Replacing S3 Client in Hadoop plugin

2021-10-13 Thread Tamir Sagi
Hey community.

I would like to know if there is any way to replace the S3 client in Hadoop 
plugin[1] to a custom client(AmazonS3).

I did notice that Hadoop plugin supports replacing the implementation of 
S3AFileSystem using
"fs.s3a.impl" (in flink-conf.yaml it will be "s3.impl") but not the client 
itself [2]

  fs.s3a.impl
  org.apache.hadoop.fs.s3a.S3AFileSystem
  The implementation class of the S3A Filesystem

I delved into Hadoop plugin source code [3] , the Client itself is of type 
AmazonS3Client and cannot be replaced (for example) with a client of type 
AmazonS3EncryptionV2.


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
Amazon S3 | Apache 
Flink<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins>
Entropy injection for S3 file systems # The bundled S3 file systems 
(flink-s3-fs-presto and flink-s3-fs-hadoop) support entropy injection.Entropy 
injection is a technique to improve the scalability of AWS S3 buckets through 
adding some random characters near the beginning of the key.
ci.apache.org

[2] 
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A>
[3] 
https://github.com/apache/hadoop/blob/master/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Hadoop-AWS module: Integration with Amazon Web Services - Apache 
Hadoop<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=property%3E%0A%0A%3Cproperty%3E%0A%20%20%3Cname%3E-,fs.s3a.impl,-%3C/name%3E%0A%20%20%3Cvalue%3Eorg>
Overview. Apache Hadoop’s hadoop-aws module provides support for AWS 
integration. applications to easily use this support.. To include the S3A 
client in Apache Hadoop’s default classpath: Make sure 
thatHADOOP_OPTIONAL_TOOLS in hadoop-env.sh includes hadoop-aws in its list of 
optional modules to add in the classpath.. For client side interaction, you can 
declare that relevant JARs must be ...
hadoop.apache.org
Thank you,

Best,
Tamir.



Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: Any one can help me? How to connect offline hadoop cluster and realtime hadoop cluster by different hive catalog?

2021-08-27 Thread Caizhi Weng
Hi!

It seems that your Flink cluster cannot connect to
realtime-cluster-master001/xx.xx.xx.xx:8050. Please check your network and
port status.

Jim Chen  于2021年8月27日周五 下午2:20写道:

> Hi, All
>   My flink version is 1.13.1 and my company have two hadoop cluster,
> offline hadoop cluster and realtime hadoop cluster. Now, on realtime hadoop
> cluster, we want to submit flink job to connect offline hadoop cluster by
> different hive catalog. I use different hive configuration diretory in hive
> catalog configuration. The error log of flink job as follow:
>
>   2021-08-27 13:50:22,902 INFO  org.apache.hadoop.ipc.Client
>   [] - Retrying connect to server:
> realtime-cluster-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry
> policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50, sleepTime=1000
> MILLISECONDS)。
>
>   Any one can help me? Thanks!
>


Any one can help me? How to connect offline hadoop cluster and realtime hadoop cluster by different hive catalog?

2021-08-27 Thread Jim Chen
Hi, All
  My flink version is 1.13.1 and my company have two hadoop cluster,
offline hadoop cluster and realtime hadoop cluster. Now, on realtime hadoop
cluster, we want to submit flink job to connect offline hadoop cluster by
different hive catalog. I use different hive configuration diretory in hive
catalog configuration. The error log of flink job as follow:

  2021-08-27 13:50:22,902 INFO  org.apache.hadoop.ipc.Client
  [] - Retrying connect to server:
realtime-cluster-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry
policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50, sleepTime=1000
MILLISECONDS)。

  Any one can help me? Thanks!


Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-23 Thread Flavio Pompermaier
Could this be related to https://issues.apache.org/jira/browse/FLINK-22414?

On Thu, Jul 22, 2021 at 3:53 PM Timo Walther  wrote:

> Thanks, this should definitely work with the pre-packaged connectors of
> Ververica platform.
>
> I guess we have to investigate what is going on. Until then, a
> workaround could be to add Hadoop manually and set the HADOOP_CLASSPATH
> environment variable. The root cause seems that Hadoop cannot be found.
>
> Alternatively, you could also build a custom image and include Hadoop in
> the lib folder of Flink:
>
> https://docs.ververica.com/v1.3/platform/installation/custom_images.html
>
> I hope this helps. I will get back to you if we have a fix ready.
>
> Regards,
> Timo
>
>
>
> On 22.07.21 14:30, Natu Lauchande wrote:
> > Sure.
> >
> > That's how the ddl table looks like:
> >
> > CREATETABLEtablea (
> >
> > `a` BIGINT,
> >
> > `b` BIGINT,
> >
> > `c` BIGINT
> >
> > )
> >
> > COMMENT ''
> >
> > WITH(
> >
> > 'auto-compaction'='false',
> >
> > 'connector'='filesystem',
> >
> > 'format'='parquet',
> >
> > 'parquet.block.size'='134217728',
> >
> > 'parquet.compression'='SNAPPY',
> >
> > 'parquet.dictionary.page.size'='1048576',
> >
> > 'parquet.enable.dictionary'='true',
> >
> > 'parquet.page.size'='1048576',
> >
> > 'parquet.writer.max-padding'='2097152',
> >
> > 'path'='s3a://test/test’,
> >
> > 'sink.partition-commit.delay'='1 h',
> >
> > 'sink.partition-commit.policy.kind'='success-file',
> >
> > 'sink.partition-commit.success-file.name
> > <http://sink.partition-commit.success-file.name>'='_SUCCESS',
> >
> > 'sink.partition-commit.trigger'='process-time',
> >
> > 'sink.rolling-policy.check-interval'='20 min',
> >
> > 'sink.rolling-policy.file-size'='128MB',
> >
> > 'sink.rolling-policy.rollover-interval'='2 h'
> >
> > );
> >
> >
> >
> > When a change the connector to a blackhole it immediately works without
> > errors. I have the redacted the names and paths.
> >
> >
> >
> > Thanks,
> > Natu
> >
> >
> > On Thu, Jul 22, 2021 at 2:24 PM Timo Walther  > <mailto:twal...@apache.org>> wrote:
> >
> > Maybe you can share also which connector/format you are using? What
> is
> > the DDL?
> >
> > Regards,
> > Timo
> >
> >
> > On 22.07.21 14:11, Natu Lauchande wrote:
> >  > Hey Timo,
> >  >
> >  > Thanks for the reply.
> >  >
> >  > No custom file as we are using Flink SQL and submitting the job
> > directly
> >  > through the SQL Editor UI. We are using Flink 1.13.1 as the
> > supported
> >  > flink version. No custom code all through Flink SQL on UI no jars.
> >  >
> >  > Thanks,
> >  > Natu
> >  >
> >  > On Thu, Jul 22, 2021 at 2:08 PM Timo Walther  > <mailto:twal...@apache.org>
> >  > <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:
> >  >
> >  > Hi Natu,
> >  >
> >  > Ververica Platform 2.5 has updated the bundled Hadoop version
> > but this
> >  > should not result in a NoClassDefFoundError exception. How
> > are you
> >  > submitting your SQL jobs? You don't use Ververica's SQL
> > service but
> >  > have
> >  > built a regular JAR file, right? If this is the case, can you
> > share
> >  > your
> >  > pom.xml file with us? The Flink version stays constant at
> 1.12?
> >  >
> >  > Regards,
> >  > Timo
> >  >
> >  > On 22.07.21 12:22, Natu Lauchande wrote:
> >  >  > Good day Flink community,
> >  >  >
> >  >  > Apache Flink/Ververica Community Edition - Question
> >  >  >
> >  >  >
> >  >  > I am having an issue with my Flink SQL jobs since updating
> >  > from Flink
> >  >  > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs
> > running on
> >  >  > parquet and S3 i am getting the following error
> continuously:
> >  >  >
> >  >  > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @
> >   

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-22 Thread Timo Walther
Thanks, this should definitely work with the pre-packaged connectors of 
Ververica platform.


I guess we have to investigate what is going on. Until then, a 
workaround could be to add Hadoop manually and set the HADOOP_CLASSPATH 
environment variable. The root cause seems that Hadoop cannot be found.


Alternatively, you could also build a custom image and include Hadoop in 
the lib folder of Flink:


https://docs.ververica.com/v1.3/platform/installation/custom_images.html

I hope this helps. I will get back to you if we have a fix ready.

Regards,
Timo



On 22.07.21 14:30, Natu Lauchande wrote:

Sure.

That's how the ddl table looks like:

CREATETABLEtablea (

`a` BIGINT,

`b` BIGINT,

`c` BIGINT

)

COMMENT ''

WITH(

'auto-compaction'='false',

'connector'='filesystem',

'format'='parquet',

'parquet.block.size'='134217728',

'parquet.compression'='SNAPPY',

'parquet.dictionary.page.size'='1048576',

'parquet.enable.dictionary'='true',

'parquet.page.size'='1048576',

'parquet.writer.max-padding'='2097152',

'path'='s3a://test/test’,

'sink.partition-commit.delay'='1 h',

'sink.partition-commit.policy.kind'='success-file',

'sink.partition-commit.success-file.name 
<http://sink.partition-commit.success-file.name>'='_SUCCESS',


'sink.partition-commit.trigger'='process-time',

'sink.rolling-policy.check-interval'='20 min',

'sink.rolling-policy.file-size'='128MB',

'sink.rolling-policy.rollover-interval'='2 h'

);



When a change the connector to a blackhole it immediately works without 
errors. I have the redacted the names and paths.




Thanks,
Natu


On Thu, Jul 22, 2021 at 2:24 PM Timo Walther <mailto:twal...@apache.org>> wrote:


Maybe you can share also which connector/format you are using? What is
the DDL?

Regards,
Timo


On 22.07.21 14:11, Natu Lauchande wrote:
 > Hey Timo,
 >
 > Thanks for the reply.
 >
 > No custom file as we are using Flink SQL and submitting the job
directly
 > through the SQL Editor UI. We are using Flink 1.13.1 as the
supported
 > flink version. No custom code all through Flink SQL on UI no jars.
 >
 > Thanks,
 > Natu
 >
 > On Thu, Jul 22, 2021 at 2:08 PM Timo Walther mailto:twal...@apache.org>
 > <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:
 >
 >     Hi Natu,
 >
 >     Ververica Platform 2.5 has updated the bundled Hadoop version
but this
 >     should not result in a NoClassDefFoundError exception. How
are you
 >     submitting your SQL jobs? You don't use Ververica's SQL
service but
 >     have
 >     built a regular JAR file, right? If this is the case, can you
share
 >     your
 >     pom.xml file with us? The Flink version stays constant at 1.12?
 >
 >     Regards,
 >     Timo
 >
 >     On 22.07.21 12:22, Natu Lauchande wrote:
 >      > Good day Flink community,
 >      >
 >      > Apache Flink/Ververica Community Edition - Question
 >      >
 >      >
 >      > I am having an issue with my Flink SQL jobs since updating
 >     from Flink
 >      > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs
running on
 >      > parquet and S3 i am getting the following error continuously:
 >      >
 >      > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @
 >      > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local
(dataPort=39309).
 >      >
 >      > java.lang.NoClassDefFoundError:
org/apache/hadoop/conf/Configuration
 >      >
 >      > at java.lang.Class.getDeclaredConstructors0(Native Method)
 >     ~[?:1.8.0_292]
 >      >
 >      > at
java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
 >      > ~[?:1.8.0_292]
 >      >
 >      > at java.lang.Class.getDeclaredConstructors(Class.java:2020)
 >     ~[?:1.8.0_292]
 >      >
 >      > **
 >      >
 >      > at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
 >      > ~[?:1.8.0_292]
 >      >
 >      > at
 >      >
 >   
  org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)

 >
 >      > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >      >
 >      > at
 >      >
 >   
  org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)

 >
 >      > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >      >
 >      > at
 >      >
 >   
  org.apache.flink.util.Instan

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-22 Thread Natu Lauchande
Sure.

That's how the ddl table looks like:

CREATE TABLE tablea (

  `a` BIGINT,

  `b` BIGINT,

  `c` BIGINT

)

COMMENT ''

WITH (

  'auto-compaction' = 'false',

  'connector' = 'filesystem',

  'format' = 'parquet',

  'parquet.block.size' = '134217728',

  'parquet.compression' = 'SNAPPY',

  'parquet.dictionary.page.size' = '1048576',

  'parquet.enable.dictionary' = 'true',

  'parquet.page.size' = '1048576',

  'parquet.writer.max-padding' = '2097152',

  'path' = 's3a://test/test’,

  'sink.partition-commit.delay' = '1 h',

  'sink.partition-commit.policy.kind' = 'success-file',

  'sink.partition-commit.success-file.name' = '_SUCCESS',

  'sink.partition-commit.trigger' = 'process-time',

  'sink.rolling-policy.check-interval' = '20 min',

  'sink.rolling-policy.file-size' = '128MB',

  'sink.rolling-policy.rollover-interval' = '2 h'

);



When a change the connector to a blackhole it immediately works without
errors. I have the redacted the names and paths.



Thanks,
Natu

On Thu, Jul 22, 2021 at 2:24 PM Timo Walther  wrote:

> Maybe you can share also which connector/format you are using? What is
> the DDL?
>
> Regards,
> Timo
>
>
> On 22.07.21 14:11, Natu Lauchande wrote:
> > Hey Timo,
> >
> > Thanks for the reply.
> >
> > No custom file as we are using Flink SQL and submitting the job directly
> > through the SQL Editor UI. We are using Flink 1.13.1 as the supported
> > flink version. No custom code all through Flink SQL on UI no jars.
> >
> > Thanks,
> > Natu
> >
> > On Thu, Jul 22, 2021 at 2:08 PM Timo Walther  > <mailto:twal...@apache.org>> wrote:
> >
> > Hi Natu,
> >
> > Ververica Platform 2.5 has updated the bundled Hadoop version but
> this
> > should not result in a NoClassDefFoundError exception. How are you
> > submitting your SQL jobs? You don't use Ververica's SQL service but
> > have
> > built a regular JAR file, right? If this is the case, can you share
> > your
> > pom.xml file with us? The Flink version stays constant at 1.12?
> >
> > Regards,
> > Timo
> >
> > On 22.07.21 12:22, Natu Lauchande wrote:
> >  > Good day Flink community,
> >  >
> >  > Apache Flink/Ververica Community Edition - Question
> >  >
> >  >
> >  > I am having an issue with my Flink SQL jobs since updating
> > from Flink
> >  > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on
> >  > parquet and S3 i am getting the following error continuously:
> >  >
> >  > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @
> >  > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local
> (dataPort=39309).
> >  >
> >  > java.lang.NoClassDefFoundError:
> org/apache/hadoop/conf/Configuration
> >  >
> >  > at java.lang.Class.getDeclaredConstructors0(Native Method)
> > ~[?:1.8.0_292]
> >  >
> >  > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
> >  > ~[?:1.8.0_292]
> >  >
> >  > at java.lang.Class.getDeclaredConstructors(Class.java:2020)
> > ~[?:1.8.0_292]
> >  >
> >  > **
> >  >
> >  > at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
> >  > ~[?:1.8.0_292]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  > at
> >  >
> >
>  
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
> >
> >  > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >  >
> >  &

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-22 Thread Timo Walther
Maybe you can share also which connector/format you are using? What is 
the DDL?


Regards,
Timo


On 22.07.21 14:11, Natu Lauchande wrote:

Hey Timo,

Thanks for the reply.

No custom file as we are using Flink SQL and submitting the job directly 
through the SQL Editor UI. We are using Flink 1.13.1 as the supported 
flink version. No custom code all through Flink SQL on UI no jars.


Thanks,
Natu

On Thu, Jul 22, 2021 at 2:08 PM Timo Walther <mailto:twal...@apache.org>> wrote:


Hi Natu,

Ververica Platform 2.5 has updated the bundled Hadoop version but this
should not result in a NoClassDefFoundError exception. How are you
submitting your SQL jobs? You don't use Ververica's SQL service but
have
built a regular JAR file, right? If this is the case, can you share
your
pom.xml file with us? The Flink version stays constant at 1.12?

Regards,
Timo

On 22.07.21 12:22, Natu Lauchande wrote:
 > Good day Flink community,
 >
 > Apache Flink/Ververica Community Edition - Question
 >
 >
 > I am having an issue with my Flink SQL jobs since updating
from Flink
 > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on
 > parquet and S3 i am getting the following error continuously:
 >
 > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @
 > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309).
 >
 > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
 >
 > at java.lang.Class.getDeclaredConstructors0(Native Method)
~[?:1.8.0_292]
 >
 > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
 > ~[?:1.8.0_292]
 >
 > at java.lang.Class.getDeclaredConstructors(Class.java:2020)
~[?:1.8.0_292]
 >
 > **
 >
 > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
 > ~[?:1.8.0_292]
 >
 > at
 >

org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 > at
 >

org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181)

 > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
 >
 &

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-22 Thread Natu Lauchande
Hey Timo,

Thanks for the reply.

No custom file as we are using Flink SQL and submitting the job directly
through the SQL Editor UI. We are using Flink 1.13.1 as the supported flink
version. No custom code all through Flink SQL on UI no jars.

Thanks,
Natu

On Thu, Jul 22, 2021 at 2:08 PM Timo Walther  wrote:

> Hi Natu,
>
> Ververica Platform 2.5 has updated the bundled Hadoop version but this
> should not result in a NoClassDefFoundError exception. How are you
> submitting your SQL jobs? You don't use Ververica's SQL service but have
> built a regular JAR file, right? If this is the case, can you share your
> pom.xml file with us? The Flink version stays constant at 1.12?
>
> Regards,
> Timo
>
> On 22.07.21 12:22, Natu Lauchande wrote:
> > Good day Flink community,
> >
> > Apache Flink/Ververica Community Edition - Question
> >
> >
> > I am having an issue with my Flink SQL jobs since updating from Flink
> > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on
> > parquet and S3 i am getting the following error continuously:
> >
> > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @
> > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309).
> >
> > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
> >
> > at java.lang.Class.getDeclaredConstructors0(Native Method) ~[?:1.8.0_292]
> >
> > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
> > ~[?:1.8.0_292]
> >
> > at java.lang.Class.getDeclaredConstructors(Class.java:2020)
> ~[?:1.8.0_292]
> >
> > **
> >
> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
> > ~[?:1.8.0_292]
> >
> > at
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181)
>
> > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTas

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-22 Thread Timo Walther

Hi Natu,

Ververica Platform 2.5 has updated the bundled Hadoop version but this 
should not result in a NoClassDefFoundError exception. How are you 
submitting your SQL jobs? You don't use Ververica's SQL service but have 
built a regular JAR file, right? If this is the case, can you share your 
pom.xml file with us? The Flink version stays constant at 1.12?


Regards,
Timo

On 22.07.21 12:22, Natu Lauchande wrote:

Good day Flink community,

Apache Flink/Ververica Community Edition - Question


I am having an issue with my Flink SQL jobs since updating from Flink 
1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on 
parquet and S3 i am getting the following error continuously:


INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @ 
10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309).


java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

at java.lang.Class.getDeclaredConstructors0(Native Method) ~[?:1.8.0_292]

at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) 
~[?:1.8.0_292]


at java.lang.Class.getDeclaredConstructors(Class.java:2020) ~[?:1.8.0_292]

**

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) 
~[?:1.8.0_292]


at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]

Caused by: java.lang.ClassNotFoundException: 
org.apache.hadoop.conf.Configuration


at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_292]

at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_292]

at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) 
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]


at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) 
~[flink

Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-22 Thread Natu Lauchande
Good day Flink community,

Apache Flink/Ververica Community Edition - Question


I am having an issue with my Flink SQL jobs since updating from Flink
1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on parquet
and S3 i am getting the following error continuously:

INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @
10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309).

java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

at java.lang.Class.getDeclaredConstructors0(Native Method)
~[?:1.8.0_292]

at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
~[?:1.8.0_292]

at java.lang.Class.getDeclaredConstructors(Class.java:2020)
~[?:1.8.0_292]

**

   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
~[?:1.8.0_292]

at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]

Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.conf.Configuration

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
~[?:1.8.0_292]

at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_292]

at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[]

at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_292]

... 57 more

2021-07-22 09:38:43,095 DEBUG org.apache.flink.runtime.scheduler.SharedSlot
  [] - Remove logical slot
(SlotRequestId{4297879e795d0516e36a7c26ccc795b2}) for execution vertex (id
cbc357ccb763df2852fee8c4fc7d55f2_0) from

Re: Failure running Flink locally with flink-s3-fs-hadoop + AWS SDK v2 as a dependency

2021-07-20 Thread Yaroslav Tkachenko
Hi, sorry for resurrecting the old thread, but I have precisely the same issue 
with a different filesystem.

I've tried using plugins dir, setting FLINK_PLUGINS_DIR, etc. - nothing works 
locally. I added a breakpoint to the PluginConfig.getPluginsDir method and 
confirmed it's not even called. 

Could this be a LocalStreamEnvironment limitation? Is there any way to enable 
plugin loading locally?

Thanks!

On 2021/06/21 11:13:29, Yuval Itzchakov  wrote: 
> Currently I have the s3-hadoop dependency in my build.sbt.
> 
> I guess I need to move it to the PLUGIN directory locally as well, didn't
> think of that.
> 
> On Mon, Jun 21, 2021, 13:35 Dawid Wysakowicz  wrote:
> 
> > Hi,
> >
> > Where do you use the AWS SDK v2? Filesystems should use the plugins
> > subsystem[1]. Therefore when running from within the IDE make sure you have
> > the s3 plugin in ${WORKING_DIR}/plugins/s3 and put the flink-s3-fs-hadoop
> > dependency there. Or specify a different directory for plugins via the ENV
> > variable: FLINK_PLUGINS_DIR.
> >
> > If you want to use the AWS SDK v2 for the s3 filesystem I guess you need
> > to build the jar yourself.
> >
> > Best,
> >
> > Dawid
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/
> >
> >
> > On 20/06/2021 15:00, Yuval Itzchakov wrote:
> >
> > Hi,
> >
> > I'm trying to run Flink with AWS SDK v2 and restore a Flink savepoint
> > that's stored in S3.
> >
> > When adding the dependency on flink-s3-fs-hadoop locally, I fail to run
> > Flink at runtime with the following error:
> >
> > Caused by: java.lang.invoke.LambdaConversionException: Invalid receiver
> > type interface org.apache.http.Header; not a subtype of implementation type
> > interface org.apache.http.NameValuePair
> >
> > [image: image.png]
> >
> > This happens because the flink-s3 JAR is a fat JAR containing an old
> > version of org.apache.http.HttpMessage which does not implement the
> > NameValuePair interface (there was a non backwards compatible change in the
> > library, see https://github.com/aws/aws-sdk-java-v2/issues/652)
> >
> > [image: image.png]
> >
> > Since the library itself isn't shaded, it gets loaded first overriding the
> > HttpClient newer version I have on the classpath and blows up when
> > bootstrapping Flink.
> >
> > Any advice on how to work around this?
> > --
> > Best Regards,
> > Yuval Itzchakov.
> >
> >
> 


Re: Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop

2021-05-23 Thread mts_geek
你好,我也遇到了这个问题,请问下你具体是如何打镜像的呢?
我是Dockerfile里添加
COPY --chown=flink:flink jars/flink-shaded-hadoop-2-2.8.3-10.0.jar
$FLINK_HOME/lib/

但是运行flink 1.11 on k8s的session cluster,
jobserver能启动,但是提交job后报错。说不能初始化HadoopUtils,
但是flink-shaded-hadoop-2-2.8.3-10.0.jar里的确是有HadoopUtils这个class的。


Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.flink.runtime.util.HadoopUtils

<http://apache-flink.147419.n8.nabble.com/file/t1510/WX20210522-115421%402x.png>
 




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink Hive connector: hive-conf-dir supports hdfs URI, while hadoop-conf-dir supports local path only?

2021-04-26 Thread Yik San Chan
Hi community,

This question is cross-posted on Stack Overflow
https://stackoverflow.com/questions/67264156/flink-hive-connector-hive-conf-dir-supports-hdfs-uri-while-hadoop-conf-dir-sup

In my current setup, local dev env can access testing env. I would like to
run Flink job on local dev env, while reading/writing data from/to testing
env Hive.

This is what I do:

```
CREATE CATALOG hive WITH (
'type' = 'hive',
'hive-conf-dir' = 'hdfs://testhdp273/hive/conf'
)
```

However, I realizes I also need to specify a matching Hadoop classpath,
therefore I want to also define `hadoop-conf-dir` that actually points to
the hadoop classpath in testing env. However, as said in [docs](
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/#hadoop-conf-dir
):

> Path to Hadoop conf dir. Only local file system paths are supported. The
recommended way to set Hadoop conf is via the HADOOP_CONF_DIR environment
variable. Use the option only if environment variable doesn't work for you,
e.g. if you want to configure each HiveCatalog separately.

I wonder why hadoop-conf-dir only supports local path, while hive-conf-dir
supports any legit hdfs path? Any work around to this problem?

Any help? Thanks!

Best,
Yik San


Re: Flink Hadoop config on docker-compose

2021-04-22 Thread Matthias Pohl
I think you're right, Flavio. I created FLINK-22414 to cover this. Thanks
for bringing it up.

Matthias

[1] https://issues.apache.org/jira/browse/FLINK-22414

On Fri, Apr 16, 2021 at 9:32 AM Flavio Pompermaier 
wrote:

> Hi Yang,
> isn't this something to fix? If I look at the documentation at  [1], in
> the "Passing configuration via environment variables" section, there is:
>
> "The environment variable FLINK_PROPERTIES should contain a list of Flink
> cluster configuration options separated by new line,
> the same way as in the flink-conf.yaml. FLINK_PROPERTIES takes precedence
> over configurations in flink-conf.yaml."
>
> To me this means that if I specify "env.hadoop.conf.dir" it should be
> handled as well. Am I wrong?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html
>
> Best,
> Flavio
>
> On Fri, Apr 16, 2021 at 4:52 AM Yang Wang  wrote:
>
>> It seems that we do not export HADOOP_CONF_DIR as environment variables
>> in current implementation, even though we have set the env.xxx flink config
>> options. It is only used to construct the classpath for the JM/TM process.
>> However, in "HadoopUtils"[2] we do not support getting the hadoop
>> configuration from classpath.
>>
>>
>> [1].
>> https://github.com/apache/flink/blob/release-1.11/flink-dist/src/main/flink-bin/bin/config.sh#L256
>> [2].
>> https://github.com/apache/flink/blob/release-1.11/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java#L64
>>
>>
>> Best,
>> Yang
>>
>> Best,
>> Yang
>>
>> Flavio Pompermaier  于2021年4月16日周五 上午3:55写道:
>>
>>> Hi Robert,
>>> indeed my docker-compose does work only if I add also Hadoop and yarn
>>> home while I was expecting that those two variables were generated
>>> automatically just setting env.xxx variables in FLINK_PROPERTIES variable..
>>>
>>> I just want to understand what to expect, if I really need to specify
>>> Hadoop and yarn home as env variables or not
>>>
>>> Il gio 15 apr 2021, 20:39 Robert Metzger  ha
>>> scritto:
>>>
>>>> Hi,
>>>>
>>>> I'm not aware of any known issues with Hadoop and Flink on Docker.
>>>>
>>>> I also tried what you are doing locally, and it seems to work:
>>>>
>>>> flink-jobmanager| 2021-04-15 18:37:48,300 INFO
>>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting
>>>> StandaloneSessionClusterEntrypoint.
>>>> flink-jobmanager| 2021-04-15 18:37:48,338 INFO
>>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
>>>> default filesystem.
>>>> flink-jobmanager| 2021-04-15 18:37:48,375 INFO
>>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
>>>> security context.
>>>> flink-jobmanager| 2021-04-15 18:37:48,404 INFO
>>>>  org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop
>>>> user set to flink (auth:SIMPLE)
>>>> flink-jobmanager| 2021-04-15 18:37:48,408 INFO
>>>>  org.apache.flink.runtime.security.modules.JaasModule [] - Jaas
>>>> file will be created as /tmp/jaas-811306162058602256.conf.
>>>> flink-jobmanager| 2021-04-15 18:37:48,415 INFO
>>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>>>> Initializing cluster services.
>>>>
>>>> Here's my code:
>>>>
>>>> https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39
>>>>
>>>> Hope this helps!
>>>>
>>>> On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Hi everybody,
>>>>> I'm trying to set up reading from HDFS using docker-compose and Flink
>>>>> 1.11.3.
>>>>> If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir'
>>>>> using FLINK_PROPERTIES (under environment section of the docker-compose
>>>>> service) I see in the logs the following line:
>>>>>
>>>>> "Could not find Hadoop configuration via any of the supported method"
>>>>>
>>>>> If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not
>>>>> generated by the run scripts.
>>>>> I

Re: Flink Hadoop config on docker-compose

2021-04-22 Thread Flavio Pompermaier
Great! Thanks for the support

On Thu, Apr 22, 2021 at 2:57 PM Matthias Pohl 
wrote:

> I think you're right, Flavio. I created FLINK-22414 to cover this. Thanks
> for bringing it up.
>
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-22414
>
> On Fri, Apr 16, 2021 at 9:32 AM Flavio Pompermaier 
> wrote:
>
>> Hi Yang,
>> isn't this something to fix? If I look at the documentation at  [1], in
>> the "Passing configuration via environment variables" section, there is:
>>
>> "The environment variable FLINK_PROPERTIES should contain a list of Flink
>> cluster configuration options separated by new line,
>> the same way as in the flink-conf.yaml. FLINK_PROPERTIES takes precedence
>> over configurations in flink-conf.yaml."
>>
>> To me this means that if I specify "env.hadoop.conf.dir" it should be
>> handled as well. Am I wrong?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html
>>
>> Best,
>> Flavio
>>
>> On Fri, Apr 16, 2021 at 4:52 AM Yang Wang  wrote:
>>
>>> It seems that we do not export HADOOP_CONF_DIR as environment variables
>>> in current implementation, even though we have set the env.xxx flink config
>>> options. It is only used to construct the classpath for the JM/TM process.
>>> However, in "HadoopUtils"[2] we do not support getting the hadoop
>>> configuration from classpath.
>>>
>>>
>>> [1].
>>> https://github.com/apache/flink/blob/release-1.11/flink-dist/src/main/flink-bin/bin/config.sh#L256
>>> [2].
>>> https://github.com/apache/flink/blob/release-1.11/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java#L64
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Best,
>>> Yang
>>>
>>> Flavio Pompermaier  于2021年4月16日周五 上午3:55写道:
>>>
>>>> Hi Robert,
>>>> indeed my docker-compose does work only if I add also Hadoop and yarn
>>>> home while I was expecting that those two variables were generated
>>>> automatically just setting env.xxx variables in FLINK_PROPERTIES variable..
>>>>
>>>> I just want to understand what to expect, if I really need to specify
>>>> Hadoop and yarn home as env variables or not
>>>>
>>>> Il gio 15 apr 2021, 20:39 Robert Metzger  ha
>>>> scritto:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm not aware of any known issues with Hadoop and Flink on Docker.
>>>>>
>>>>> I also tried what you are doing locally, and it seems to work:
>>>>>
>>>>> flink-jobmanager| 2021-04-15 18:37:48,300 INFO
>>>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
>>>>> Starting
>>>>> StandaloneSessionClusterEntrypoint.
>>>>> flink-jobmanager| 2021-04-15 18:37:48,338 INFO
>>>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
>>>>> default filesystem.
>>>>> flink-jobmanager| 2021-04-15 18:37:48,375 INFO
>>>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
>>>>> security context.
>>>>> flink-jobmanager| 2021-04-15 18:37:48,404 INFO
>>>>>  org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop
>>>>> user set to flink (auth:SIMPLE)
>>>>> flink-jobmanager| 2021-04-15 18:37:48,408 INFO
>>>>>  org.apache.flink.runtime.security.modules.JaasModule [] - Jaas
>>>>> file will be created as /tmp/jaas-811306162058602256.conf.
>>>>> flink-jobmanager| 2021-04-15 18:37:48,415 INFO
>>>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>>>>> Initializing cluster services.
>>>>>
>>>>> Here's my code:
>>>>>
>>>>> https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39
>>>>>
>>>>> Hope this helps!
>>>>>
>>>>> On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> Hi everybody,
>>>>>> I'm trying to set up reading from HDFS using docker-compose and Flink
>>>>>> 1.11.3.
>>>>>> If I pass 'env.hadoop.c

Re: Flink Hadoop config on docker-compose

2021-04-15 Thread Yang Wang
It seems that we do not export HADOOP_CONF_DIR as environment variables in
current implementation, even though we have set the env.xxx flink config
options. It is only used to construct the classpath for the JM/TM process.
However, in "HadoopUtils"[2] we do not support getting the hadoop
configuration from classpath.


[1].
https://github.com/apache/flink/blob/release-1.11/flink-dist/src/main/flink-bin/bin/config.sh#L256
[2].
https://github.com/apache/flink/blob/release-1.11/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java#L64


Best,
Yang

Best,
Yang

Flavio Pompermaier  于2021年4月16日周五 上午3:55写道:

> Hi Robert,
> indeed my docker-compose does work only if I add also Hadoop and yarn home
> while I was expecting that those two variables were generated automatically
> just setting env.xxx variables in FLINK_PROPERTIES variable..
>
> I just want to understand what to expect, if I really need to specify
> Hadoop and yarn home as env variables or not
>
> Il gio 15 apr 2021, 20:39 Robert Metzger  ha scritto:
>
>> Hi,
>>
>> I'm not aware of any known issues with Hadoop and Flink on Docker.
>>
>> I also tried what you are doing locally, and it seems to work:
>>
>> flink-jobmanager| 2021-04-15 18:37:48,300 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting
>> StandaloneSessionClusterEntrypoint.
>> flink-jobmanager| 2021-04-15 18:37:48,338 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
>> default filesystem.
>> flink-jobmanager| 2021-04-15 18:37:48,375 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
>> security context.
>> flink-jobmanager| 2021-04-15 18:37:48,404 INFO
>>  org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop
>> user set to flink (auth:SIMPLE)
>> flink-jobmanager| 2021-04-15 18:37:48,408 INFO
>>  org.apache.flink.runtime.security.modules.JaasModule [] - Jaas
>> file will be created as /tmp/jaas-811306162058602256.conf.
>> flink-jobmanager| 2021-04-15 18:37:48,415 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>> Initializing cluster services.
>>
>> Here's my code:
>>
>> https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39
>>
>> Hope this helps!
>>
>> On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi everybody,
>>> I'm trying to set up reading from HDFS using docker-compose and Flink
>>> 1.11.3.
>>> If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir'
>>> using FLINK_PROPERTIES (under environment section of the docker-compose
>>> service) I see in the logs the following line:
>>>
>>> "Could not find Hadoop configuration via any of the supported method"
>>>
>>> If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not
>>> generated by the run scripts.
>>> Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under
>>> environment section of the docker-compose service) I don't see that line.
>>>
>>> Is this the expected behavior?
>>>
>>> Below the relevant docker-compose service I use (I've removed the
>>> content of HADOOP_CLASSPATH content because is too long and I didn't report
>>> the taskmanager that is similar):
>>>
>>> flink-jobmanager:
>>> container_name: flink-jobmanager
>>> build:
>>>   context: .
>>>   dockerfile: Dockerfile.flink
>>>   args:
>>> FLINK_VERSION: 1.11.3-scala_2.12-java11
>>> image: 'flink-test:1.11.3-scala_2.12-java11'
>>> ports:
>>>   - "8091:8081"
>>>   - "8092:8082"
>>> command: jobmanager
>>> environment:
>>>   - |
>>> FLINK_PROPERTIES=
>>> jobmanager.rpc.address: flink-jobmanager
>>> rest.port: 8081
>>> historyserver.web.port: 8082
>>> web.upload.dir: /opt/flink
>>> env.hadoop.conf.dir: /opt/hadoop/conf
>>> env.yarn.conf.dir: /opt/hadoop/conf
>>>   - |
>>> HADOOP_CLASSPATH=...
>>>   - HADOOP_CONF_DIR=/opt/hadoop/conf
>>>   - YARN_CONF_DIR=/opt/hadoop/conf
>>> volumes:
>>>   - 'flink_shared_folder:/tmp/test'
>>>   - 'flink_uploads:/opt/flink/flink-web-upload'
>>>   - 'flink_hadoop_conf:/opt/hadoop/conf'
>>>   - 'flink_hadoop_libs:/opt/hadoop-3.2.1/share'
>>>
>>>
>>> Thanks in advance for any support,
>>> Flavio
>>>
>>


Re: Flink Hadoop config on docker-compose

2021-04-15 Thread Flavio Pompermaier
Hi Robert,
indeed my docker-compose does work only if I add also Hadoop and yarn home
while I was expecting that those two variables were generated automatically
just setting env.xxx variables in FLINK_PROPERTIES variable..

I just want to understand what to expect, if I really need to specify
Hadoop and yarn home as env variables or not

Il gio 15 apr 2021, 20:39 Robert Metzger  ha scritto:

> Hi,
>
> I'm not aware of any known issues with Hadoop and Flink on Docker.
>
> I also tried what you are doing locally, and it seems to work:
>
> flink-jobmanager| 2021-04-15 18:37:48,300 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting
> StandaloneSessionClusterEntrypoint.
> flink-jobmanager| 2021-04-15 18:37:48,338 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
> default filesystem.
> flink-jobmanager| 2021-04-15 18:37:48,375 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
> security context.
> flink-jobmanager| 2021-04-15 18:37:48,404 INFO
>  org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop
> user set to flink (auth:SIMPLE)
> flink-jobmanager| 2021-04-15 18:37:48,408 INFO
>  org.apache.flink.runtime.security.modules.JaasModule [] - Jaas
> file will be created as /tmp/jaas-811306162058602256.conf.
> flink-jobmanager| 2021-04-15 18:37:48,415 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> Initializing cluster services.
>
> Here's my code:
>
> https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39
>
> Hope this helps!
>
> On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier 
> wrote:
>
>> Hi everybody,
>> I'm trying to set up reading from HDFS using docker-compose and Flink
>> 1.11.3.
>> If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir'
>> using FLINK_PROPERTIES (under environment section of the docker-compose
>> service) I see in the logs the following line:
>>
>> "Could not find Hadoop configuration via any of the supported method"
>>
>> If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not
>> generated by the run scripts.
>> Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under
>> environment section of the docker-compose service) I don't see that line.
>>
>> Is this the expected behavior?
>>
>> Below the relevant docker-compose service I use (I've removed the content
>> of HADOOP_CLASSPATH content because is too long and I didn't report the
>> taskmanager that is similar):
>>
>> flink-jobmanager:
>> container_name: flink-jobmanager
>> build:
>>   context: .
>>   dockerfile: Dockerfile.flink
>>   args:
>> FLINK_VERSION: 1.11.3-scala_2.12-java11
>> image: 'flink-test:1.11.3-scala_2.12-java11'
>> ports:
>>   - "8091:8081"
>>   - "8092:8082"
>> command: jobmanager
>> environment:
>>   - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: flink-jobmanager
>> rest.port: 8081
>> historyserver.web.port: 8082
>> web.upload.dir: /opt/flink
>>     env.hadoop.conf.dir: /opt/hadoop/conf
>> env.yarn.conf.dir: /opt/hadoop/conf
>>   - |
>> HADOOP_CLASSPATH=...
>>   - HADOOP_CONF_DIR=/opt/hadoop/conf
>>   - YARN_CONF_DIR=/opt/hadoop/conf
>> volumes:
>>   - 'flink_shared_folder:/tmp/test'
>>   - 'flink_uploads:/opt/flink/flink-web-upload'
>>   - 'flink_hadoop_conf:/opt/hadoop/conf'
>>   - 'flink_hadoop_libs:/opt/hadoop-3.2.1/share'
>>
>>
>> Thanks in advance for any support,
>> Flavio
>>
>


Re: Flink Hadoop config on docker-compose

2021-04-15 Thread Robert Metzger
Hi,

I'm not aware of any known issues with Hadoop and Flink on Docker.

I also tried what you are doing locally, and it seems to work:

flink-jobmanager| 2021-04-15 18:37:48,300 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting
StandaloneSessionClusterEntrypoint.
flink-jobmanager| 2021-04-15 18:37:48,338 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
default filesystem.
flink-jobmanager| 2021-04-15 18:37:48,375 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
security context.
flink-jobmanager| 2021-04-15 18:37:48,404 INFO
 org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop
user set to flink (auth:SIMPLE)
flink-jobmanager| 2021-04-15 18:37:48,408 INFO
 org.apache.flink.runtime.security.modules.JaasModule [] - Jaas
file will be created as /tmp/jaas-811306162058602256.conf.
flink-jobmanager| 2021-04-15 18:37:48,415 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Initializing cluster services.

Here's my code:

https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39

Hope this helps!

On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier 
wrote:

> Hi everybody,
> I'm trying to set up reading from HDFS using docker-compose and Flink
> 1.11.3.
> If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir'
> using FLINK_PROPERTIES (under environment section of the docker-compose
> service) I see in the logs the following line:
>
> "Could not find Hadoop configuration via any of the supported method"
>
> If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not
> generated by the run scripts.
> Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under
> environment section of the docker-compose service) I don't see that line.
>
> Is this the expected behavior?
>
> Below the relevant docker-compose service I use (I've removed the content
> of HADOOP_CLASSPATH content because is too long and I didn't report the
> taskmanager that is similar):
>
> flink-jobmanager:
> container_name: flink-jobmanager
> build:
>   context: .
>   dockerfile: Dockerfile.flink
>   args:
> FLINK_VERSION: 1.11.3-scala_2.12-java11
> image: 'flink-test:1.11.3-scala_2.12-java11'
> ports:
>   - "8091:8081"
>   - "8092:8082"
> command: jobmanager
> environment:
>   - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: flink-jobmanager
>     rest.port: 8081
> historyserver.web.port: 8082
> web.upload.dir: /opt/flink
> env.hadoop.conf.dir: /opt/hadoop/conf
>     env.yarn.conf.dir: /opt/hadoop/conf
>   - |
> HADOOP_CLASSPATH=...
>   - HADOOP_CONF_DIR=/opt/hadoop/conf
>   - YARN_CONF_DIR=/opt/hadoop/conf
> volumes:
>   - 'flink_shared_folder:/tmp/test'
>   - 'flink_uploads:/opt/flink/flink-web-upload'
>   - 'flink_hadoop_conf:/opt/hadoop/conf'
>   - 'flink_hadoop_libs:/opt/hadoop-3.2.1/share'
>
>
> Thanks in advance for any support,
> Flavio
>


Flink Hadoop config on docker-compose

2021-04-14 Thread Flavio Pompermaier
Hi everybody,
I'm trying to set up reading from HDFS using docker-compose and Flink
1.11.3.
If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir'
using FLINK_PROPERTIES (under environment section of the docker-compose
service) I see in the logs the following line:

"Could not find Hadoop configuration via any of the supported method"

If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not
generated by the run scripts.
Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under
environment section of the docker-compose service) I don't see that line.

Is this the expected behavior?

Below the relevant docker-compose service I use (I've removed the content
of HADOOP_CLASSPATH content because is too long and I didn't report the
taskmanager that is similar):

flink-jobmanager:
container_name: flink-jobmanager
build:
  context: .
  dockerfile: Dockerfile.flink
  args:
FLINK_VERSION: 1.11.3-scala_2.12-java11
image: 'flink-test:1.11.3-scala_2.12-java11'
ports:
  - "8091:8081"
  - "8092:8082"
command: jobmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
rest.port: 8081
historyserver.web.port: 8082
web.upload.dir: /opt/flink
env.hadoop.conf.dir: /opt/hadoop/conf
    env.yarn.conf.dir: /opt/hadoop/conf
  - |
HADOOP_CLASSPATH=...
  - HADOOP_CONF_DIR=/opt/hadoop/conf
      - YARN_CONF_DIR=/opt/hadoop/conf
volumes:
  - 'flink_shared_folder:/tmp/test'
  - 'flink_uploads:/opt/flink/flink-web-upload'
  - 'flink_hadoop_conf:/opt/hadoop/conf'
  - 'flink_hadoop_libs:/opt/hadoop-3.2.1/share'


Thanks in advance for any support,
Flavio


Re: Hadoop is not in the classpath/dependencies

2021-03-30 Thread Chesnay Schepler
This looks related to HDFS-12920; where Hadoop 2.X tries to read a 
duration from hdfs-default.xml expecting plain numbers, but in 3.x they 
also contain time units.


On 3/30/2021 9:37 AM, Matthias Seiler wrote:


Thank you all for the replies!


I did as @Maminspapin suggested and indeed the previous error 
disappeared, but now the exception is

```
java.io.IOException: Cannot instantiate file system for URI: 
hdfs://node-1:9000/flink

//...
Caused by: java.lang.NumberFormatException: For input string: "30s"
// this is thrown by the flink-shaded-hadoop library
```
I thought that it relates to the windowing I do, which has a slide 
interval of 30 seconds, but removing it displays the same error.


I also added the dependency to the maven pom, but without effect.

Since I use Hadoop 3.2.1, I also tried 
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber 
but with this I can't even start a cluster (`TaskManager 
initialization failed`).




@Robert, Flink includes roughly 100 hdfs jars. 
`hadoop-hdfs-client-3.2.1.jar` is one of them and is supposed to 
contain `DistributedFileSystem.class`, which I checked running `jar 
tvf hadoop-3.2.1/share/hadoop/hdfs/hadoop-hdfs-client-3.2.1.jar | grep 
DistributedFileSystem`. How can I verify that the class is really 
accessible?


Cheers,
Matthias

On 3/26/21 10:20 AM, Robert Metzger wrote:

Hey Matthias,

Maybe the classpath contains hadoop libraries, but not the HDFS 
libraries? The "DistributedFileSystem" class needs to be accessible 
to the classloader. Can you check if that class is available?


Best,
Robert

On Thu, Mar 25, 2021 at 11:10 AM Matthias Seiler 
<mailto:matthias.sei...@campus.tu-berlin.de>> wrote:


Hello everybody,

I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two
machines.
The job should store the checkpoints on HDFS like so:
```java
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink"));
```

Unfortunately, the JobManager throws
```
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not
find a file system implementation for scheme 'hdfs'. The scheme
is not
directly supported by Flink and no Hadoop file system to support this
scheme could be loaded. For a full list of supported file systems,
please see
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
<https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/>.
// ...
Caused by:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Hadoop is
not in the classpath/dependencies.
```
and I don't understand why.

`echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with
wildcards. Flink's JobManger prints the classpath which includes
specific packages from these Hadoop libraries. Besides that, Flink
creates the state directories on HDFS, but no content.

Thank you for any advice,
Matthias





Re: Hadoop is not in the classpath/dependencies

2021-03-30 Thread Matthias Seiler
Thank you all for the replies!


I did as @Maminspapin suggested and indeed the previous error
disappeared, but now the exception is
```
java.io.IOException: Cannot instantiate file system for URI:
hdfs://node-1:9000/flink
//...
Caused by: java.lang.NumberFormatException: For input string: "30s"
// this is thrown by the flink-shaded-hadoop library
```
I thought that it relates to the windowing I do, which has a slide
interval of 30 seconds, but removing it displays the same error.

I also added the dependency to the maven pom, but without effect.

Since I use Hadoop 3.2.1, I also tried
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber
but with this I can't even start a cluster (`TaskManager initialization
failed`).



@Robert, Flink includes roughly 100 hdfs jars.
`hadoop-hdfs-client-3.2.1.jar` is one of them and is supposed to contain
`DistributedFileSystem.class`, which I checked running `jar tvf
hadoop-3.2.1/share/hadoop/hdfs/hadoop-hdfs-client-3.2.1.jar | grep
DistributedFileSystem`. How can I verify that the class is really
accessible?

Cheers,
Matthias

On 3/26/21 10:20 AM, Robert Metzger wrote:
> Hey Matthias,
>
> Maybe the classpath contains hadoop libraries, but not the HDFS
> libraries? The "DistributedFileSystem" class needs to be accessible to
> the classloader. Can you check if that class is available?
>
> Best,
> Robert
>
> On Thu, Mar 25, 2021 at 11:10 AM Matthias Seiler
>  <mailto:matthias.sei...@campus.tu-berlin.de>> wrote:
>
> Hello everybody,
>
> I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two
> machines.
> The job should store the checkpoints on HDFS like so:
> ```java
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink"));
> ```
>
> Unfortunately, the JobManager throws
> ```
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not
> find a file system implementation for scheme 'hdfs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded. For a full list of supported file systems,
> please see
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/>.
> // ...
> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is
> not in the classpath/dependencies.
> ```
> and I don't understand why.
>
> `echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with
> wildcards. Flink's JobManger prints the classpath which includes
> specific packages from these Hadoop libraries. Besides that, Flink
> creates the state directories on HDFS, but no content.
>
> Thank you for any advice,
> Matthias
>


Re: Hadoop is not in the classpath/dependencies

2021-03-26 Thread Robert Metzger
Hey Matthias,

Maybe the classpath contains hadoop libraries, but not the HDFS libraries?
The "DistributedFileSystem" class needs to be accessible to the
classloader. Can you check if that class is available?

Best,
Robert

On Thu, Mar 25, 2021 at 11:10 AM Matthias Seiler <
matthias.sei...@campus.tu-berlin.de> wrote:

> Hello everybody,
>
> I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two machines.
> The job should store the checkpoints on HDFS like so:
> ```java
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink"));
> ```
>
> Unfortunately, the JobManager throws
> ```
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'hdfs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded. For a full list of supported file systems,
> please see
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
> // ...
> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is
> not in the classpath/dependencies.
> ```
> and I don't understand why.
>
> `echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with
> wildcards. Flink's JobManger prints the classpath which includes
> specific packages from these Hadoop libraries. Besides that, Flink
> creates the state directories on HDFS, but no content.
>
> Thank you for any advice,
> Matthias
>
>


????hadoop#configuration

2021-03-25 Thread ????
hi all
onyarn31??flink-confhadoop yarn
https://issues.apache.org/jira/browse/FLINK-21981


2??hadoop#configuration??yarnyarn??configuration??yarn??configurationconfiguration??
https://issues.apache.org/jira/browse/FLINK-21982


3??flink??-ythdfs-site/core-site??flink??hadoop#configuration??hdfs/core-siteconf??configuration??hdfs/core-stie??classloader.getresource2??


reviewmerge FLINK-21640


thanks all

Re: Hadoop is not in the classpath/dependencies

2021-03-25 Thread Maminspapin
I downloaded the lib (last version) from here:
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-7.0/

and put it in the flink_home/lib directory.

It helped.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Hadoop is not in the classpath/dependencies

2021-03-25 Thread Maminspapin
I have the same problem  ...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Hadoop is not in the classpath/dependencies

2021-03-25 Thread Matthias Seiler
Hello everybody,

I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two machines.
The job should store the checkpoints on HDFS like so:
```java
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink"));
```

Unfortunately, the JobManager throws
```
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 'hdfs'. The scheme is not
directly supported by Flink and no Hadoop file system to support this
scheme could be loaded. For a full list of supported file systems,
please see
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
// ...
Caused by:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is
not in the classpath/dependencies.
```
and I don't understand why.

`echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with
wildcards. Flink's JobManger prints the classpath which includes
specific packages from these Hadoop libraries. Besides that, Flink
creates the state directories on HDFS, but no content.

Thank you for any advice,
Matthias



Flink job manager HA 是否可以像 Hadoop Name Node 一样手动重启?

2021-03-21 Thread macdoor
Flink job manager HA 是否可以像 Hadoop Name Node 一样手动重启,同时保证集群正常运行?

我发现  job manager 占用内存似乎总是在缓慢不断增长,Hadoop Name Node 也有这个问题,我通过隔一段时间轮动重启Hadoop
Name Node 解决这个问题,在HA模式下Flink job manager 是否可以轮动重启?





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Hadoop Integration Link broken in downloads page

2021-03-10 Thread Till Rohrmann
Thanks a lot for reporting this problem Debraj. I've created a JIRA issue
for it [1].

[1] https://issues.apache.org/jira/browse/FLINK-21723

Cheers,
Till

On Tue, Mar 9, 2021 at 5:28 AM Debraj Manna 
wrote:

> Hi
>
> It appears the Hadoop Interation
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/deployment/hadoop.html>
> link is broken on downloads <https://flink.apache.org/downloads.html>
> page.
>
> Apache Flink® 1.12.2 is our latest stable release.
>> If you plan to use Apache Flink together with Apache Hadoop (run Flink on
>> YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file
>> system connector), please check out the Hadoop Integration
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/deployment/hadoop.html>
>>  documentation.
>
>
> It is throwing 404 Error.
>
> Thanks
>
>


Hadoop Integration Link broken in downloads page

2021-03-08 Thread Debraj Manna
Hi

It appears the Hadoop Interation
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/deployment/hadoop.html>
link is broken on downloads <https://flink.apache.org/downloads.html> page.

Apache Flink® 1.12.2 is our latest stable release.
> If you plan to use Apache Flink together with Apache Hadoop (run Flink on
> YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file
> system connector), please check out the Hadoop Integration
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/deployment/hadoop.html>
>  documentation.


It is throwing 404 Error.

Thanks


[DISCUSS] Removal of flink-swift-fs-hadoop module

2021-01-26 Thread Robert Metzger
Hi all,

during a security maintenance PR [1], Chesnay noticed that the
flink-swift-fs-hadoop module is lacking test coverage [2].
Also, there hasn't been any substantial change since 2018, when it was
introduced.
On the user@ ML, I could not find any proof of significant use of the
module (no one mentioned any problems with it).

*I propose to remove this module in Flink 1.13*. Otherwise, we would
release a module with known vulnerable dependencies, and an unknown
stability.
If there are users, they can still use the 1.12.0 release of it. If we
notice that there are a lot of users, we can reintroduce the FS, and add
proper tests for it.

Please let me know if you have any concerns, otherwise, I'll remove it.


[1] https://github.com/apache/flink/pull/14749
[2] https://issues.apache.org/jira/browse/FLINK-20804


[DISCUSS] Removal of flink-swift-fs-hadoop module

2021-01-26 Thread Robert Metzger
Hi all,

during a security maintenance PR [1], Chesnay noticed that the
flink-swift-fs-hadoop module is lacking test coverage [2].
Also, there hasn't been any substantial change since 2018, when it was
introduced.
On the user@ ML, I could not find any proof of significant use of the
module (no one mentioned any problems with it).

*I propose to remove this module in Flink 1.13*. Otherwise, we would
release a module with known vulnerable dependencies, and an unknown
stability.
If there are users, they can still use the 1.12.0 release of it. If we
notice that there are a lot of users, we can reintroduce the FS, and add
proper tests for it.

Please let me know if you have any concerns, otherwise, I'll remove it.


[1] https://github.com/apache/flink/pull/14749
[2] https://issues.apache.org/jira/browse/FLINK-20804


Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-22 Thread 赵一旦
此外,写ORC格式文件,对于Map格式的有人知道怎么写的话给个示例吧。
如下,拿到MapColumnVector之后怎么写呢,目前非Map的简单字段都比较清晰,直接设置xxxColumnVector.vector[rowId]的值即可。但是MapColumnVector的API比较乱,没看懂怎么用。

MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];


赵一旦  于2021年1月23日周六 下午1:42写道:

> 已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。
>
> 张锴  于2021年1月21日周四 下午7:35写道:
>
>> @赵一旦
>> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下
>>
>> 张锴  于2021年1月21日周四 下午7:13写道:
>>
>> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
>> >
>> > 赵一旦  于2021年1月21日周四 下午7:05写道:
>> >
>> >> @Michael Ran; 嗯嗯,没关系。
>> >>
>> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
>> >>
>> >>
>> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
>> >>
>> >> Michael Ran  于2021年1月21日周四 下午7:01写道:
>> >>
>> >> >
>> >> >
>> >>
>> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
>> >> > 在 2021-01-21 18:45:06,"张锴"  写道:
>> >> > >import
>> >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
>> >> > >DateTimeBucketer}
>> >> > >
>> >> > >sink.setBucketer sink.setWriter用这种方式试试
>> >> > >
>> >> > >
>> >> > >
>> >> > >赵一旦  于2021年1月21日周四 下午6:37写道:
>> >> > >
>> >> > >> @Michael Ran
>> >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>> >> > >>
>> >> > >> Michael Ran  于2021年1月21日周四 下午5:23写道:
>> >> > >>
>> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
>> >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs)
>> {...}
>> >> > >> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
>> >> > >> > >具体报错信息如下:
>> >> > >> > >
>> >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
>> >> > Hadoop
>> >> > >> are
>> >> > >> > >only supported for HDFS
>> >> > >> > >at
>> >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
>> >> > >> > >HadoopRecoverableWriter.java:61)
>> >> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>> >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
>> >> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>> >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>> >> > >> > >at
>> org.apache.flink.streaming.api.functions.sink.filesystem.
>> >> > >> >
>> >> >
>> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>> >> > >> > >.java:260)
>> >> > >> > >at
>> org.apache.flink.streaming.api.functions.sink.filesystem.
>> >> > >> >
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>> >> > >> > >at
>> org.apache.flink.streaming.api.functions.sink.filesystem.
>> >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
>> >> > >> > >at
>> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
>> >> > >> > >at
>> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
>> >> > >> > >at
>> >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
>> >> > >> > >at
>> >> > >>
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>> >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
>> >> > >> > >at
>> >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> >> > >> > >.initializeState(AbstractStreamOperator.java:264)
>> >> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
>> >> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
>> >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
>> >> > >> > >at
>> >> > >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>> >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
>> >> > >> > >at
>> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>> >> > >> > >StreamTask.java:501)
>> >> > >> > >at
>> >> > >> >
>> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> >> > >> > >.java:531)
>> >> > >> > >at
>> >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> >> > >> > >at
>> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> >> > >> > >at java.lang.Thread.run(Thread.java:748)
>> >> > >> > >
>> >> > >> > >
>> >> > >> > >赵一旦  于2021年1月21日周四 下午5:17写道:
>> >> > >> > >
>> >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
>> >> > >> > >>
>> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>> >> > >> > >>
>> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>> >> > >> > >>
>> >> > >> > >>
>> >> > >> > >>
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> >
>>
>


Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-22 Thread 赵一旦
已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。

张锴  于2021年1月21日周四 下午7:35写道:

> @赵一旦
> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下
>
> 张锴  于2021年1月21日周四 下午7:13写道:
>
> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
> >
> > 赵一旦  于2021年1月21日周四 下午7:05写道:
> >
> >> @Michael Ran; 嗯嗯,没关系。
> >>
> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
> >>
> >>
> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
> >>
> >> Michael Ran  于2021年1月21日周四 下午7:01写道:
> >>
> >> >
> >> >
> >>
> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
> >> > 在 2021-01-21 18:45:06,"张锴"  写道:
> >> > >import
> >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
> >> > >DateTimeBucketer}
> >> > >
> >> > >sink.setBucketer sink.setWriter用这种方式试试
> >> > >
> >> > >
> >> > >
> >> > >赵一旦  于2021年1月21日周四 下午6:37写道:
> >> > >
> >> > >> @Michael Ran
> >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
> >> > >>
> >> > >> Michael Ran  于2021年1月21日周四 下午5:23写道:
> >> > >>
> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
> >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> >> > >> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
> >> > >> > >具体报错信息如下:
> >> > >> > >
> >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
> >> > Hadoop
> >> > >> are
> >> > >> > >only supported for HDFS
> >> > >> > >at
> >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> >> > >> > >HadoopRecoverableWriter.java:61)
> >> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> >> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> >
> >> >
> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> >> > >> > >.java:260)
> >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> >> > >> > >at
> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> >> > >> > >at
> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> >> > >> > >at
> >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
> >> > >> > >at
> >> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> >> > >> > >at
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
> >> > >> > >.initializeState(AbstractStreamOperator.java:264)
> >> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
> >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> >> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
> >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> >> > >> > >at
> >> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
> >> > >> > >at
> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> >> > >> > >StreamTask.java:501)
> >> > >> > >at
> >> > >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> >> > >> > >.java:531)
> >> > >> > >at
> >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> >> > >> > >at
> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> >> > >> > >at java.lang.Thread.run(Thread.java:748)
> >> > >> > >
> >> > >> > >
> >> > >> > >赵一旦  于2021年1月21日周四 下午5:17写道:
> >> > >> > >
> >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
> >> > >> > >>
> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> >> > >> > >>
> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> >> > >> > >>
> >> > >> > >>
> >> > >> > >>
> >> > >> >
> >> > >>
> >> >
> >>
> >
>


Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread 张锴
@赵一旦
另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下

张锴  于2021年1月21日周四 下午7:13写道:

> 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
>
> 赵一旦  于2021年1月21日周四 下午7:05写道:
>
>> @Michael Ran; 嗯嗯,没关系。
>>
>> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
>>
>> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
>>
>> Michael Ran  于2021年1月21日周四 下午7:01写道:
>>
>> >
>> >
>> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
>> > 在 2021-01-21 18:45:06,"张锴"  写道:
>> > >import
>> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
>> > >DateTimeBucketer}
>> > >
>> > >sink.setBucketer sink.setWriter用这种方式试试
>> > >
>> > >
>> > >
>> > >赵一旦  于2021年1月21日周四 下午6:37写道:
>> > >
>> > >> @Michael Ran
>> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>> > >>
>> > >> Michael Ran  于2021年1月21日周四 下午5:23写道:
>> > >>
>> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
>> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
>> > >> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
>> > >> > >具体报错信息如下:
>> > >> > >
>> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
>> > Hadoop
>> > >> are
>> > >> > >only supported for HDFS
>> > >> > >at
>> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
>> > >> > >HadoopRecoverableWriter.java:61)
>> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
>> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >> >
>> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>> > >> > >.java:260)
>> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >> >
>> > >> >
>> > >>
>> >
>> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
>> > >> > >at
>> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
>> > >> > >at
>> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
>> > >> > >at
>> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
>> > >> > >at
>> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
>> > >> > >at
>> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> > >> > >.initializeState(AbstractStreamOperator.java:264)
>> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
>> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
>> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
>> > >> > >at
>> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
>> > >> > >at
>> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>> > >> > >StreamTask.java:501)
>> > >> > >at
>> > >> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> > >> > >.java:531)
>> > >> > >at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> > >> > >at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> > >> > >at java.lang.Thread.run(Thread.java:748)
>> > >> > >
>> > >> > >
>> > >> > >赵一旦  于2021年1月21日周四 下午5:17写道:
>> > >> > >
>> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
>> > >> > >>
>> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>> > >> > >>
>> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> >
>> > >>
>> >
>>
>


Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread 张锴
我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的

赵一旦  于2021年1月21日周四 下午7:05写道:

> @Michael Ran; 嗯嗯,没关系。
>
> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
>
> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
>
> Michael Ran  于2021年1月21日周四 下午7:01写道:
>
> >
> >
> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
> > 在 2021-01-21 18:45:06,"张锴"  写道:
> > >import
> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
> > >DateTimeBucketer}
> > >
> > >sink.setBucketer sink.setWriter用这种方式试试
> > >
> > >
> > >
> > >赵一旦  于2021年1月21日周四 下午6:37写道:
> > >
> > >> @Michael Ran
> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
> > >>
> > >> Michael Ran  于2021年1月21日周四 下午5:23写道:
> > >>
> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> > >> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
> > >> > >具体报错信息如下:
> > >> > >
> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
> > Hadoop
> > >> are
> > >> > >only supported for HDFS
> > >> > >at
> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> > >> > >HadoopRecoverableWriter.java:61)
> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >> >
> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> > >> > >.java:260)
> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >> >
> > >> >
> > >>
> >
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> > >> > >at
> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> > >> > >at
> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> > >> > >at
> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
> > >> > >at
> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> > >> > >at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
> > >> > >.initializeState(AbstractStreamOperator.java:264)
> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> > >> > >at
> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
> > >> > >at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> > >> > >StreamTask.java:501)
> > >> > >at
> > >> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> > >> > >.java:531)
> > >> > >at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> > >> > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> > >> > >at java.lang.Thread.run(Thread.java:748)
> > >> > >
> > >> > >
> > >> > >赵一旦  于2021年1月21日周四 下午5:17写道:
> > >> > >
> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
> > >> > >>
> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> > >> > >>
> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> > >> > >>
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
>


Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread 赵一旦
@Michael Ran; 嗯嗯,没关系。

@张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。

Michael Ran  于2021年1月21日周四 下午7:01写道:

>
> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
> 在 2021-01-21 18:45:06,"张锴"  写道:
> >import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
> >DateTimeBucketer}
> >
> >sink.setBucketer sink.setWriter用这种方式试试
> >
> >
> >
> >赵一旦  于2021年1月21日周四 下午6:37写道:
> >
> >> @Michael Ran
> >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
> >>
> >> Michael Ran  于2021年1月21日周四 下午5:23写道:
> >>
> >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
> >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> >> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
> >> > >具体报错信息如下:
> >> > >
> >> > >java.lang.UnsupportedOperationException: Recoverable writers on
> Hadoop
> >> are
> >> > >only supported for HDFS
> >> > >at
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> >> > >HadoopRecoverableWriter.java:61)
> >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> >
> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> >> > >.java:260)
> >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> >
> >> >
> >>
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> >> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> >> > >at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> >> > >at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> >> > >at
> >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> >> > >.initializeState(AbstractUdfStreamOperator.java:96)
> >> > >at
> >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> >> > >at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator
> >> > >.initializeState(AbstractStreamOperator.java:264)
> >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
> >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
> >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> >> > >at
> >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> >> > >.runThrowing(StreamTaskActionExecutor.java:47)
> >> > >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> >> > >StreamTask.java:501)
> >> > >at
> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> >> > >.java:531)
> >> > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> >> > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> >> > >at java.lang.Thread.run(Thread.java:748)
> >> > >
> >> > >
> >> > >赵一旦  于2021年1月21日周四 下午5:17写道:
> >> > >
> >> > >> Recoverable writers on Hadoop are only supported for HDFS
> >> > >>
> >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> >> > >>
> >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> >> > >>
> >> > >>
> >> > >>
> >> >
> >>
>


Re:Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread Michael Ran
很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
在 2021-01-21 18:45:06,"张锴"  写道:
>import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
>DateTimeBucketer}
>
>sink.setBucketer sink.setWriter用这种方式试试
>
>
>
>赵一旦  于2021年1月21日周四 下午6:37写道:
>
>> @Michael Ran
>> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>>
>> Michael Ran  于2021年1月21日周四 下午5:23写道:
>>
>> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
>> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
>> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
>> > >具体报错信息如下:
>> > >
>> > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
>> are
>> > >only supported for HDFS
>> > >at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
>> > >HadoopRecoverableWriter.java:61)
>> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>> > >.createRecoverableWriter(HadoopFileSystem.java:210)
>> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>> > >.java:260)
>> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
>> >
>> >
>> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
>> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
>> > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
>> > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
>> > >at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> > >.initializeState(AbstractUdfStreamOperator.java:96)
>> > >at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
>> > >at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> > >.initializeState(AbstractStreamOperator.java:264)
>> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
>> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
>> > >.lambda$beforeInvoke$2(StreamTask.java:507)
>> > >at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>> > >.runThrowing(StreamTaskActionExecutor.java:47)
>> > >at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>> > >StreamTask.java:501)
>> > >at
>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> > >.java:531)
>> > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> > >at java.lang.Thread.run(Thread.java:748)
>> > >
>> > >
>> > >赵一旦  于2021年1月21日周四 下午5:17写道:
>> > >
>> > >> Recoverable writers on Hadoop are only supported for HDFS
>> > >>
>> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>> > >>
>> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>> > >>
>> > >>
>> > >>
>> >
>>


Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread 张锴
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
DateTimeBucketer}

sink.setBucketer sink.setWriter用这种方式试试



赵一旦  于2021年1月21日周四 下午6:37写道:

> @Michael Ran
> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>
> Michael Ran  于2021年1月21日周四 下午5:23写道:
>
> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> > 在 2021-01-21 17:18:23,"赵一旦"  写道:
> > >具体报错信息如下:
> > >
> > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are
> > >only supported for HDFS
> > >at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> > >HadoopRecoverableWriter.java:61)
> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> > >.java:260)
> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >
> >
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> > >at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> > >at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> > >.initializeState(AbstractUdfStreamOperator.java:96)
> > >at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> > >at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> > >.initializeState(AbstractStreamOperator.java:264)
> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain
> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> > >at org.apache.flink.streaming.runtime.tasks.StreamTask
> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> > >at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> > >.runThrowing(StreamTaskActionExecutor.java:47)
> > >at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> > >StreamTask.java:501)
> > >at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> > >.java:531)
> > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> > >at java.lang.Thread.run(Thread.java:748)
> > >
> > >
> > >赵一旦  于2021年1月21日周四 下午5:17写道:
> > >
> > >> Recoverable writers on Hadoop are only supported for HDFS
> > >>
> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> > >>
> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> > >>
> > >>
> > >>
> >
>


Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread 赵一旦
@Michael Ran
然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。

Michael Ran  于2021年1月21日周四 下午5:23写道:

> 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
> HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> 在 2021-01-21 17:18:23,"赵一旦"  写道:
> >具体报错信息如下:
> >
> >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
> >only supported for HDFS
> >at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> >HadoopRecoverableWriter.java:61)
> >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> >.createRecoverableWriter(HadoopFileSystem.java:210)
> >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> >.java:260)
> >at org.apache.flink.streaming.api.functions.sink.filesystem.
>
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> >at org.apache.flink.streaming.api.functions.sink.filesystem.
> >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >.restoreFunctionState(StreamingFunctionUtils.java:167)
> >at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> >.initializeState(AbstractUdfStreamOperator.java:96)
> >at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> >at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> >.initializeState(AbstractStreamOperator.java:264)
> >at org.apache.flink.streaming.runtime.tasks.OperatorChain
> >.initializeStateAndOpenOperators(OperatorChain.java:400)
> >at org.apache.flink.streaming.runtime.tasks.StreamTask
> >.lambda$beforeInvoke$2(StreamTask.java:507)
> >at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> >.runThrowing(StreamTaskActionExecutor.java:47)
> >at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> >StreamTask.java:501)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> >.java:531)
> >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> >at java.lang.Thread.run(Thread.java:748)
> >
> >
> >赵一旦  于2021年1月21日周四 下午5:17写道:
> >
> >> Recoverable writers on Hadoop are only supported for HDFS
> >>
> >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> >>
> >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> >>
> >>
> >>
>


Re:Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread Michael Ran
这里应该是用了hdfs 的特定API吧,文件系统没兼容public 
HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
在 2021-01-21 17:18:23,"赵一旦"  写道:
>具体报错信息如下:
>
>java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
>only supported for HDFS
>at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
>HadoopRecoverableWriter.java:61)
>at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
>.createRecoverableWriter(HadoopFileSystem.java:210)
>at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
>.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>at org.apache.flink.streaming.api.functions.sink.filesystem.
>StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
>.java:260)
>at org.apache.flink.streaming.api.functions.sink.filesystem.
>StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
>at org.apache.flink.streaming.api.functions.sink.filesystem.
>StreamingFileSink.initializeState(StreamingFileSink.java:412)
>at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>.tryRestoreFunction(StreamingFunctionUtils.java:185)
>at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
>.restoreFunctionState(StreamingFunctionUtils.java:167)
>at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>.initializeState(AbstractUdfStreamOperator.java:96)
>at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
>.initializeOperatorState(StreamOperatorStateHandler.java:107)
>at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>.initializeState(AbstractStreamOperator.java:264)
>at org.apache.flink.streaming.runtime.tasks.OperatorChain
>.initializeStateAndOpenOperators(OperatorChain.java:400)
>at org.apache.flink.streaming.runtime.tasks.StreamTask
>.lambda$beforeInvoke$2(StreamTask.java:507)
>at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>.runThrowing(StreamTaskActionExecutor.java:47)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>StreamTask.java:501)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>.java:531)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>at java.lang.Thread.run(Thread.java:748)
>
>
>赵一旦  于2021年1月21日周四 下午5:17写道:
>
>> Recoverable writers on Hadoop are only supported for HDFS
>>
>> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>>
>> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>>
>>
>>


Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread 赵一旦
除此以外,FlinkSQL读现有的hive数据仓库也是失败。配置okhive的catalog,表信息都能出来,但select操作就是失败。

赵一旦  于2021年1月21日周四 下午5:18写道:

> 具体报错信息如下:
>
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are only supported for HDFS
> at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
> HadoopRecoverableWriter.java:61)
> at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> .createRecoverableWriter(HadoopFileSystem.java:210)
> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> .createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> .java:260)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:
> 270)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink.initializeState(StreamingFileSink.java:412)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .tryRestoreFunction(StreamingFunctionUtils.java:185)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .restoreFunctionState(StreamingFunctionUtils.java:167)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> .initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> .initializeOperatorState(StreamOperatorStateHandler.java:107)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:264)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:400)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:507)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:47)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:501)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:531)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
>
>
> 赵一旦  于2021年1月21日周四 下午5:17写道:
>
>> Recoverable writers on Hadoop are only supported for HDFS
>>
>> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>>
>> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>>
>>
>>


Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread 赵一旦
具体报错信息如下:

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
only supported for HDFS
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(
HadoopRecoverableWriter.java:61)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
.createRecoverableWriter(HadoopFileSystem.java:210)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
.java:260)
at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink.initializeState(StreamingFileSink.java:412)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
.tryRestoreFunction(StreamingFunctionUtils.java:185)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
.restoreFunctionState(StreamingFunctionUtils.java:167)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.initializeOperatorState(StreamOperatorStateHandler.java:107)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:264)
at org.apache.flink.streaming.runtime.tasks.OperatorChain
.initializeStateAndOpenOperators(OperatorChain.java:400)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$beforeInvoke$2(StreamTask.java:507)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:501)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:531)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)


赵一旦  于2021年1月21日周四 下午5:17写道:

> Recoverable writers on Hadoop are only supported for HDFS
>
> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>
> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>
>
>


Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 Thread 赵一旦
Recoverable writers on Hadoop are only supported for HDFS

如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。

使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。


??????flink-shaded-hadoop-2-uber????????????

2020-12-22 Thread superainbower
K8SHA??HDFS


??2020??12??22?? 13:43??liujian ??
Thanks,flink-confhistory 
server,??hdfs??,??web ui??,




----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html

 Best,
 Yang

 liujian <13597820...@qq.comgt; ??2020??12??21?? 1:35??

 gt; 
??history-server,,,??,
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 "user-zh"
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 <
 gt; danrtsey...@gmail.comamp;gt;;
 gt; :amp;nbsp;2020??12??21??(??) 10:15
 gt; 
??:amp;nbsp;"user-zh"https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
 gt
 


?????? flink-shaded-hadoop-2-uber????????????

2020-12-21 Thread liujian
Thanks,flink-confhistory 
server,??hdfs??,??web ui??,




----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html

 Best,
 Yang

 liujian <13597820...@qq.comgt; ??2020??12??21?? 1:35??

 gt; 
??history-server,,,??,
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 "user-zh"
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 <
 gt; danrtsey...@gmail.comamp;gt;;
 gt; :amp;nbsp;2020??12??21??(??) 10:15
 gt; 
??:amp;nbsp;"user-zh"https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
 gt
 


Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-21 Thread Yang Wang
history-server和native
k8s没有关系的,如果你想使用,就需要用一个deployment单独部署history-server在K8s集群内

native k8s覆盖的场景是Flink任务如何原生地提交到K8s集群内

Best,
yang

liujian <13597820...@qq.com> 于2020年12月21日周一 下午8:16写道:

> Thanks, 使用你下面的docker方式我测试确实可以,但是不知道Native K8s如何来操作,可以详细说一下
> 我现在是Dockerfile如下两种情况都试过
>
>
> COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> ENTRYPOINT ["/docker-entrypoint.sh"]
> EXPOSE 6123 8081 8082
> CMD ["help","history-server"]
>
> ----
> COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> ENTRYPOINT ["/docker-entrypoint.sh","history-server"]
> EXPOSE 6123 8081 8082
> CMD ["help"]
>
>
>
> 这两种都尝试过...,请帮忙指教一下
>
>
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danrtsey...@gmail.com;
> 发送时间:2020年12月21日(星期一) 下午3:08
> 收件人:"user-zh"
> 主题:Re: flink-shaded-hadoop-2-uber版本如何选择
>
>
>
> 是的,理解没有错,history-server会启动后listen一个端口
>
> 我这边尝试是没有问题的,你可以通过如下命令启动
> docker run -p 8082:8082 --env
> FLINK_PROPERTIES="historyserver.archive.fs.dir: file:///tmp/flink-jobs"
> flink:latest history-server
>
> 更多配置你参考如下文档
>
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html
>
> Best,
> Yang
>
> liujian <13597820...@qq.com 于2020年12月21日周一 下午1:35写道:
>
>  我理解的是启动一个history-server,会有一个进程,然后会暴露指定的端口,但是我好像并没有看到这样的效果,是我的理解有错吗
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  danrtsey...@gmail.comgt;;
>  发送时间:nbsp;2020年12月21日(星期一) 上午10:15
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
> 
> 
> 
> 
> 
> 你不需要修改CMD,entrypoint默认是docker-entrypoint.sh[1],是支持history-server的,只要传一个history-server的参数就可以了
> 
>  [1].
> 
> https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
> 
> <https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh>
> ;
>  Best,
>  Yang
> 
> 
>  liujian <13597820...@qq.comgt; 于2020年12月20日周日 下午12:45写道:
> 
>  gt; Thanks,amp;nbsp;
>  gt; amp;nbsp; amp;nbsp;
> amp;nbsp;但是我需要访问historyServer,那么应该需要如何操作我将flink
>  gt; 1.12.0的Dockerfile 修改成CMD ["history-server"]amp;nbsp;
>  并暴露8082端口,但是好像达不到这个效果
>  gt;
>  gt;
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "user-zh"
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  <
>  gt; danrtsey...@gmail.comamp;gt;;
>  gt; 发送时间:amp;nbsp;2020年12月19日(星期六) 晚上9:35
>  gt; 收件人:amp;nbsp;"user-zh" amp;gt;;
>  gt;
>  gt; 主题:amp;nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
>  gt;
>  gt;
>  gt;
>  gt; 你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了
>  gt; Flink
>  gt;
>  gt;
> 
> Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的
>  gt;
> 
> 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的
>  gt;
>  gt;
>  gt; Best,
>  gt; Yang
>  gt;
>  gt; liujian <13597820...@qq.comamp;gt; 于2020年12月19日周六
> 下午8:29写道:
>  gt;
>  gt; amp;gt;
>  gt; amp;gt;
>  gt;
> 
> HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;
> 
> --amp;amp;nbsp;原始邮件amp;amp;nbsp;--
>  gt; amp;gt; 发件人:
>  gt;
> 
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;n

?????? flink-shaded-hadoop-2-uber????????????

2020-12-21 Thread liujian
Thanks, docker??,??Native 
K8s??,??
Dockerfile??


COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 
/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081 8082
CMD ["help","history-server"]

----
COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 
/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
ENTRYPOINT ["/docker-entrypoint.sh","history-server"]
EXPOSE 6123 8081 8082
CMD ["help"]



??...,??






----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html

Best,
Yang

liujian <13597820...@qq.com ??2020??12??21?? 1:35??

 
??history-server,,,??,




 --nbsp;nbsp;--
 ??:

 "user-zh"

 <
 danrtsey...@gmail.comgt;;
 :nbsp;2020??12??21??(??) 10:15
 ??:nbsp;"user-zh"https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh

 Best,
 Yang


 liujian <13597820...@qq.comgt; ??2020??12??20?? 12:45??

 gt; Thanks,amp;nbsp;
 gt; amp;nbsp; amp;nbsp; 
amp;nbsp;??historyServer,flink
 gt; 1.12.0??Dockerfile ??CMD ["history-server"]amp;nbsp;
 ??8082,??
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 "user-zh"
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 <
 gt; danrtsey...@gmail.comamp;gt;;
 gt; :amp;nbsp;2020??12??19??(??) 9:35
 gt; 
??:amp;nbsp;"user-zh"https://github.com/apache/flink-shaded
 gt <https://github.com/apache/flink-shadedgt;; amp;gt; 
amp;amp;gt;
 [2].
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
 gt
 
<https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationgt;;
 amp;gt
 gt; <
 
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationamp;gtgt
 ;;
 gt; amp;amp;gt
 gt; amp;gt; <
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationamp;amp;gtamp;gt
 gt
 
<https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationamp;amp;gtamp;gtgt;;
 ;
 gt; amp;gt; ;
 gt; amp;gt; amp;amp;gt; Best,
 gt; amp;gt; amp;amp;gt; Yang
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt;  


Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-20 Thread Yang Wang
是的,理解没有错,history-server会启动后listen一个端口

我这边尝试是没有问题的,你可以通过如下命令启动
docker run -p 8082:8082 --env
FLINK_PROPERTIES="historyserver.archive.fs.dir: file:///tmp/flink-jobs"
flink:latest history-server

更多配置你参考如下文档
https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html

Best,
Yang

liujian <13597820...@qq.com> 于2020年12月21日周一 下午1:35写道:

> 我理解的是启动一个history-server,会有一个进程,然后会暴露指定的端口,但是我好像并没有看到这样的效果,是我的理解有错吗
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danrtsey...@gmail.com;
> 发送时间:2020年12月21日(星期一) 上午10:15
> 收件人:"user-zh"
> 主题:Re: flink-shaded-hadoop-2-uber版本如何选择
>
>
>
>
> 你不需要修改CMD,entrypoint默认是docker-entrypoint.sh[1],是支持history-server的,只要传一个history-server的参数就可以了
>
> [1].
> https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
>
> Best,
> Yang
>
>
> liujian <13597820...@qq.com 于2020年12月20日周日 下午12:45写道:
>
>  Thanks,nbsp;
>  nbsp; nbsp; nbsp;但是我需要访问historyServer,那么应该需要如何操作我将flink
>  1.12.0的Dockerfile 修改成CMD ["history-server"]nbsp;
> 并暴露8082端口,但是好像达不到这个效果
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;------
>  发件人:
> 
> "user-zh"
> 
> <
>  danrtsey...@gmail.comgt;;
>  发送时间:nbsp;2020年12月19日(星期六) 晚上9:35
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
> 
> 
> 
>  你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了
>  Flink
> 
> 
> Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的
> 
> 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的
> 
> 
>  Best,
>  Yang
> 
>  liujian <13597820...@qq.comgt; 于2020年12月19日周六 下午8:29写道:
> 
>  gt;
>  gt;
> 
> HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "user-zh"
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  <
>  gt; danrtsey...@gmail.comamp;gt;;
>  gt; 发送时间:amp;nbsp;2020年12月16日(星期三) 晚上7:21
>  gt; 收件人:amp;nbsp;"superainbower" amp;gt;;
>  gt; 抄送:amp;nbsp;"user-zh" amp;gt;;
>  gt; 主题:amp;nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
>  gt;
>  gt;
>  gt;
>  gt;
>  gt;
> 
> 如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载
>  gt;
>  gt; Best,
>  gt; Yang
>  gt;
>  gt; superainbower  于2020年12月16日周三 下午6:19写道:
>  gt;
>  gt; amp;gt; 借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去
>  gt; amp;gt; 在2020年12月16日 10:53,Yang Wang <
> danrtsey...@gmail.comamp;gt;
>  写道:
>  gt; amp;gt;
>  gt; amp;gt; 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例
>  gt; amp;gt;
>  gt; amp;gt; 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本
>  gt; amp;gt;
>  gt; amp;gt;
>  gt;
> 
> 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
>  gt; amp;gt; 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3
>  gt; amp;gt;
>  gt; amp;gt;
> 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt; [1]. https://github.com/apache/flink-shaded
>  <https://github.com/apache/flink-shaded>; gt; amp;gt;
> [2].
>  gt; amp;gt;
>  gt; amp;gt;
>  gt;
> 
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
> 
> <https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation>;
> gt
>  <
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationgt
> ;;
>  amp;gt
>  gt; <
> 
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationamp;gtgt
> 
> <https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationamp;gtgt>;
> ;
>  gt; ;
>  gt; amp;gt; Best,
>  gt; amp;gt; Yang
>  gt; amp;gt;
>  gt; amp;gt; 赢峰  于2020年12月11日周五 上午8:45写道:
>  gt; amp;gt;
>  gt; amp;gt; amp;gt; flink-shaded-hadoop-2-uber版本如何选择?
>  gt; amp;gt; amp;gt;
>  gt; amp;gt; amp;gt;
>  gt; amp;gt; amp;gt; xxx-xxx 分别表示什么意思?
>  gt; amp;gt; amp;gt;
>  gt; amp;gt; amp;gt;
>  gt; amp;gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;


?????? flink-shaded-hadoop-2-uber????????????

2020-12-20 Thread liujian
??history-server,,,??,




----
??: 
   "user-zh"

https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh

Best,
Yang


liujian <13597820...@qq.com ??2020??12??20?? 12:45??

 Thanks,nbsp;
 nbsp; nbsp; 
nbsp;??historyServer,flink
 1.12.0??Dockerfile ??CMD ["history-server"]nbsp; 
??8082,??




 --nbsp;nbsp;--
 ??:

 "user-zh"

 <
 danrtsey...@gmail.comgt;;
 :nbsp;2020??12??19??(??) 9:35
 ??:nbsp;"user-zh"https://github.com/apache/flink-shaded
 gt; amp;gt; [2].
 gt; amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
 gt
 


Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-20 Thread Yang Wang
你不需要修改CMD,entrypoint默认是docker-entrypoint.sh[1],是支持history-server的,只要传一个history-server的参数就可以了

[1].
https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh

Best,
Yang


liujian <13597820...@qq.com> 于2020年12月20日周日 下午12:45写道:

> Thanks,
>   但是我需要访问historyServer,那么应该需要如何操作我将flink
> 1.12.0的Dockerfile 修改成CMD ["history-server"] 并暴露8082端口,但是好像达不到这个效果
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danrtsey...@gmail.com;
> 发送时间:2020年12月19日(星期六) 晚上9:35
> 收件人:"user-zh"
> 主题:Re: flink-shaded-hadoop-2-uber版本如何选择
>
>
>
> 你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了
> Flink
>
> Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的
> 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的
>
>
> Best,
> Yang
>
> liujian <13597820...@qq.com 于2020年12月19日周六 下午8:29写道:
>
> 
> 
> HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  danrtsey...@gmail.comgt;;
>  发送时间:nbsp;2020年12月16日(星期三) 晚上7:21
>  收件人:nbsp;"superainbower"  抄送:nbsp;"user-zh"  主题:nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
> 
> 
> 
> 
> 
> 如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载
> 
>  Best,
>  Yang
> 
>  superainbower  
>  gt; 借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去
>  gt; 在2020年12月16日 10:53,Yang Wang  写道:
>  gt;
>  gt; 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例
>  gt;
>  gt; 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本
>  gt;
>  gt;
> 
> 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
>  gt; 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3
>  gt;
>  gt; 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0
>  gt;
>  gt;
>  gt; [1]. https://github.com/apache/flink-shaded
>  gt; [2].
>  gt;
>  gt;
> 
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
> 
> <https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation>;
> gt
>  <
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationgt
> ;
>  ;
>  gt; Best,
>  gt; Yang
>  gt;
>  gt; 赢峰   gt;
>  gt; gt; flink-shaded-hadoop-2-uber版本如何选择?
>  gt; gt;
>  gt; gt;
>  gt; gt; xxx-xxx 分别表示什么意思?
>  gt; gt;
>  gt; gt;
>  gt; gt;
>  gt;
>  gt;


?????? flink-shaded-hadoop-2-uber????????????

2020-12-19 Thread liujian
Thanks,
  ??historyServer,flink 
1.12.0??Dockerfile ??CMD ["history-server"] 
??8082,??




----
??: 
   "user-zh"

https://github.com/apache/flink-shaded
 gt; [2].
 gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
 gt
 


Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-19 Thread Yang Wang
你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了
Flink
Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的
同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的


Best,
Yang

liujian <13597820...@qq.com> 于2020年12月19日周六 下午8:29写道:

>
> HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danrtsey...@gmail.com;
> 发送时间:2020年12月16日(星期三) 晚上7:21
> 收件人:"superainbower" 抄送:"user-zh" 主题:Re: flink-shaded-hadoop-2-uber版本如何选择
>
>
>
>
> 如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载
>
> Best,
> Yang
>
> superainbower 
>  借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去
>  在2020年12月16日 10:53,Yang Wang  
>  以flink-shaded-hadoop-2-uber的2.8.3-10.0为例
> 
>  2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本
> 
> 
> 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
>  这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3
> 
>  如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0
> 
> 
>  [1]. https://github.com/apache/flink-shaded
>  [2].
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
> 
> <https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation>
> ;
>  Best,
>  Yang
> 
>  赢峰  
>   flink-shaded-hadoop-2-uber版本如何选择?
>  
>  
>   xxx-xxx 分别表示什么意思?
>  
>  
>  
> 
> 


?????? flink-shaded-hadoop-2-uber????????????

2020-12-19 Thread liujian
HDFS??Ha,hdfs-site.xml,,configMap??hdfs-site.xml??$FLINK_HOME/conf??


----
??: 
   "user-zh"

https://github.com/apache/flink-shaded
 [2].

 
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation

 Best,
 Yang

  

Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-16 Thread Yang Wang
如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载

Best,
Yang

superainbower  于2020年12月16日周三 下午6:19写道:

> 借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去
> 在2020年12月16日 10:53,Yang Wang  写道:
>
> 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例
>
> 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本
>
> 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
> 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3
>
> 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0
>
>
> [1]. https://github.com/apache/flink-shaded
> [2].
>
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
>
> Best,
> Yang
>
> 赢峰  于2020年12月11日周五 上午8:45写道:
>
> > flink-shaded-hadoop-2-uber版本如何选择?
> >
> >
> > xxx-xxx 分别表示什么意思?
> >
> >
> >
>
>


回复:flink-shaded-hadoop-2-uber版本如何选择

2020-12-16 Thread superainbower
借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去

在2020年12月16日 10:53,Yang Wang 写道:
以flink-shaded-hadoop-2-uber的2.8.3-10.0为例

2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本

社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3

如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0


[1]. https://github.com/apache/flink-shaded
[2].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation

Best,
Yang

赢峰  于2020年12月11日周五 上午8:45写道:

> flink-shaded-hadoop-2-uber版本如何选择?
>
>
> xxx-xxx 分别表示什么意思?
>
>
>


Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-15 Thread Yang Wang
以flink-shaded-hadoop-2-uber的2.8.3-10.0为例

2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本

社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3

如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0


[1]. https://github.com/apache/flink-shaded
[2].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation

Best,
Yang

赢峰  于2020年12月11日周五 上午8:45写道:

> flink-shaded-hadoop-2-uber版本如何选择?
>
>
> xxx-xxx 分别表示什么意思?
>
>
>


Re: flink-shaded-hadoop-2-uber*-* 版本确定问题

2020-12-15 Thread Yang Wang
你得确认hadoop classpath返回的是完整的,正常情况下hadoop classpath这个命令会把所有的hadoop jar都包含进去的
如果报类或者方法不存在需要确认相应的jar是否存在,并且包含进去了

社区推荐hadoop classpath的方式主要是想让Flink做到hadoop free,这样在hadoop2和hadoop3都可以正常运行了

Best,
Yang

Jacob <17691150...@163.com> 于2020年12月15日周二 上午9:25写道:

> 谢谢回复!
>
> 这个文档我也有查看
>
> 前几日在flink1.9-1.12各个客户端测试提交job时候发现
> 对于1.10+的版本,我手动导入export HADOOP_CLASSPATH=`hadoop
>
> classpath`,没有效果,各种报错,基本都是Hadoop相关类、方法不存在(NoSuchMethod之类错误),把pom文件改来改去依然无用,后来只在pom文件中导入依赖:flink-shaded-hadoop-2-uber*-*,竟然可以正常提交并运行job了。
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-shaded-hadoop-2-uber*-* 版本确定问题

2020-12-14 Thread Jacob
谢谢回复!

这个文档我也有查看

前几日在flink1.9-1.12各个客户端测试提交job时候发现
对于1.10+的版本,我手动导入export HADOOP_CLASSPATH=`hadoop
classpath`,没有效果,各种报错,基本都是Hadoop相关类、方法不存在(NoSuchMethod之类错误),把pom文件改来改去依然无用,后来只在pom文件中导入依赖:flink-shaded-hadoop-2-uber*-*,竟然可以正常提交并运行job了。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-shaded-hadoop-2-uber*-* 版本确定问题

2020-12-13 Thread silence
flink已经不建议将hadoop的jar放到lib里了

可以通过
export HADOOP_CLASSPATH=`hadoop classpath` 
加载hadoop的依赖
参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink-shaded-hadoop-2-uber*-* 版本确定问题

2020-12-12 Thread Jacob
请问在升级flink版本的过程中,需要在flink/lib里面引入该包,但该包的版本号如何确定?
flink-shaded-hadoop-2-uber*-*



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink-shaded-hadoop-2-uber版本如何选择

2020-12-10 Thread 赢峰
flink-shaded-hadoop-2-uber版本如何选择?




xxx-xxx 分别表示什么意思?





Re: Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop

2020-10-11 Thread Dream-底限
hi、可以去hadoop的一个节点直接打镜像哈,打镜像的时候把需要的hadoop依赖包、flink一起打包到docker里面,然后配置一下环境变量就可以用了;如果你的docker部署节点有hadoop或flink也可以直接外挂;目前我们使用的是第一种

Yang Wang  于2020年10月12日周一 上午10:23写道:

> 只需要base社区的镜像,然后再加上一层(拷贝flink-shaded-hadoop),commit到docker
> image,然后push到docker registry就可以了
>
> 例如Dockerfile可以如下
> FROM flink:1.11.1-scala_2.11
> COPY flink-shaded-hadoop-2*.jar /opt/flink/lib/
>
> 另外,flink-shaded-hadoop可以从这里下载[1]
>
> [1].
> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2
>
>
> Best,
> Yang
>
> yang  于2020年10月10日周六 下午5:50写道:
>
> > 麻烦问一下,从新打镜像,是把原来的包解压然后从新打包么
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop

2020-10-11 Thread Yang Wang
只需要base社区的镜像,然后再加上一层(拷贝flink-shaded-hadoop),commit到docker
image,然后push到docker registry就可以了

例如Dockerfile可以如下
FROM flink:1.11.1-scala_2.11
COPY flink-shaded-hadoop-2*.jar /opt/flink/lib/

另外,flink-shaded-hadoop可以从这里下载[1]

[1].
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2


Best,
Yang

yang  于2020年10月10日周六 下午5:50写道:

> 麻烦问一下,从新打镜像,是把原来的包解压然后从新打包么
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop

2020-10-10 Thread yang
麻烦问一下,您是怎么从新打镜像的,是把原来的jar解压出来,然后在打包么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop

2020-10-10 Thread yang
麻烦问一下,从新打镜像,是把原来的包解压然后从新打包么



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Issues with Flink Batch and Hadoop dependency

2020-08-31 Thread Arvid Heise
Hi Dan,

Your approach in general is good. You might want to use the bundled hadoop
uber jar [1] to save some time if you find the appropriate version. You can
also build your own version and include it then in lib/.

In general, I'd recommend moving away from sequence files. As soon as you
change your records minimally, everything falls apart. Going with
established binary formats like Avro or Parquet is usually desired also
because of the additional tooling and pays quickly off in the long run.

[1] https://flink.apache.org/downloads.html#additional-components

On Sat, Aug 29, 2020 at 10:50 PM Dan Hill  wrote:

> I was able to get a basic version to work by including a bunch of hadoop
> and s3 dependencies in the job jar and hacking in some hadoop config
> values.  It's probably not optimal but it looks like I'm unblocked.
>
> On Fri, Aug 28, 2020 at 12:11 PM Dan Hill  wrote:
>
>> I'm assuming I have a simple, common setup problem.  I've spent 6 hours
>> debugging and haven't been able to figure it out.  Any help would be
>> greatly appreciated.
>>
>>
>> *Problem*
>> I have a Flink Streaming job setup that writes SequenceFiles in S3.  When
>> I try to create a Flink Batch job to read these Sequence files, I get the
>> following error:
>>
>> NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat
>>
>> It fails on this readSequenceFile.
>>
>> env.createInput(HadoopInputs.readSequenceFile(Text.class,
>> ByteWritable.class, INPUT_FILE))
>>
>> If I directly depend on org-apache-hadoop/hadoop-mapred when building the
>> job, I get the following error when trying to run the job:
>>
>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>> FileSystem for scheme "s3"
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
>> at
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
>> at
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
>> at
>> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209)
>> at
>> org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48)
>> at
>> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254)
>> at
>> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
>> at
>> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:257)
>>
>>
>> *Extra context*
>> I'm using this Helm chart <https://hub.helm.sh/charts/riskfocus/flink>
>> for creating Flink.  I'm using v1.10.1.
>>
>>
>> *Questions*
>> Are there any existing projects that read batch Hadoop file formats from
>> S3?
>>
>> I've looked at these instructions for Hadoop Integration
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#add-hadoop-classpaths>.
>> I'm assuming my configuration is wrong.  I'm also assuming I need the
>> hadoop dependency properly setup in the jobmanager and taskmanager (not in
>> the job itself).  If I use this Helm chart, do I need to download a hadoop
>> common jar into the Flink images for jobmanager and taskmanager?  Are there
>> pre-built images which I can use that already have the dependencies setup?
>>
>>
>> - Dan
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Issues with Flink Batch and Hadoop dependency

2020-08-29 Thread Dan Hill
I was able to get a basic version to work by including a bunch of hadoop
and s3 dependencies in the job jar and hacking in some hadoop config
values.  It's probably not optimal but it looks like I'm unblocked.

On Fri, Aug 28, 2020 at 12:11 PM Dan Hill  wrote:

> I'm assuming I have a simple, common setup problem.  I've spent 6 hours
> debugging and haven't been able to figure it out.  Any help would be
> greatly appreciated.
>
>
> *Problem*
> I have a Flink Streaming job setup that writes SequenceFiles in S3.  When
> I try to create a Flink Batch job to read these Sequence files, I get the
> following error:
>
> NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat
>
> It fails on this readSequenceFile.
>
> env.createInput(HadoopInputs.readSequenceFile(Text.class,
> ByteWritable.class, INPUT_FILE))
>
> If I directly depend on org-apache-hadoop/hadoop-mapred when building the
> job, I get the following error when trying to run the job:
>
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "s3"
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
> at
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209)
> at
> org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48)
> at
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254)
> at
> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
> at
> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:257)
>
>
> *Extra context*
> I'm using this Helm chart <https://hub.helm.sh/charts/riskfocus/flink>
> for creating Flink.  I'm using v1.10.1.
>
>
> *Questions*
> Are there any existing projects that read batch Hadoop file formats from
> S3?
>
> I've looked at these instructions for Hadoop Integration
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#add-hadoop-classpaths>.
> I'm assuming my configuration is wrong.  I'm also assuming I need the
> hadoop dependency properly setup in the jobmanager and taskmanager (not in
> the job itself).  If I use this Helm chart, do I need to download a hadoop
> common jar into the Flink images for jobmanager and taskmanager?  Are there
> pre-built images which I can use that already have the dependencies setup?
>
>
> - Dan
>


Issues with Flink Batch and Hadoop dependency

2020-08-28 Thread Dan Hill
I'm assuming I have a simple, common setup problem.  I've spent 6 hours
debugging and haven't been able to figure it out.  Any help would be
greatly appreciated.


*Problem*
I have a Flink Streaming job setup that writes SequenceFiles in S3.  When I
try to create a Flink Batch job to read these Sequence files, I get the
following error:

NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat

It fails on this readSequenceFile.

env.createInput(HadoopInputs.readSequenceFile(Text.class,
ByteWritable.class, INPUT_FILE))

If I directly depend on org-apache-hadoop/hadoop-mapred when building the
job, I get the following error when trying to run the job:

Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
FileSystem for scheme "s3"
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209)
at
org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254)
at
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
at
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:257)


*Extra context*
I'm using this Helm chart <https://hub.helm.sh/charts/riskfocus/flink> for
creating Flink.  I'm using v1.10.1.


*Questions*
Are there any existing projects that read batch Hadoop file formats from S3?

I've looked at these instructions for Hadoop Integration
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#add-hadoop-classpaths>.
I'm assuming my configuration is wrong.  I'm also assuming I need the
hadoop dependency properly setup in the jobmanager and taskmanager (not in
the job itself).  If I use this Helm chart, do I need to download a hadoop
common jar into the Flink images for jobmanager and taskmanager?  Are there
pre-built images which I can use that already have the dependencies setup?


- Dan


Re: Re: hive-exec依赖导致hadoop冲突问题

2020-08-24 Thread amen...@163.com
好的谢谢回复,

在指定hive版本为2.1.1时,我选择了在程序中导入hive-exec-2.1.1、flink-connector-hive_2.11-1.11.1依赖,可正常操作hive
 table;

best,
amenhub

 
发件人: Rui Li
发送时间: 2020-08-24 21:33
收件人: user-zh
主题: Re: hive-exec依赖导致hadoop冲突问题
Hi,
 
hive-exec本身并不包含Hadoop,如果是因为maven的传递依赖引入的话可以在打包时去掉。运行时使用的Hadoop版本可以用你集群Hadoop版本,而不是hive本身依赖的Hadoop版本。另外对于Flink
1.11也可以考虑使用官方提供的flink-sql-connector-hive Uber
jar,这个jar包含所有hive的依赖(Hadoop的依赖还是需要另外添加)。更详细的信息建议参考文档 [1][2]。
 
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#dependencies
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes
 
On Mon, Aug 24, 2020 at 9:05 PM amen...@163.com  wrote:
 
>
> 补充一下,当我移除hive-exec等程序中的hadoop依赖时,任务依旧异常,所以也许是我哪个地方没有到位,觉得依赖冲突是因为在测试hive集成之前,我提交过到yarn执行并无异常,所以排查思路来到了hive这里,
> 现在看来,可能是另外某个原因导致的,贴一点点异常栈如下:
>
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not deploy Yarn job cluster.
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
> at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
> at
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
> ... 19 more
> Caused by: java.lang.ClassCastException:
> org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto
> cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy63.getClusterNodes(Unknown Source)
> at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:311)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy64.getClusterNodes(Unknown Source)
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:618)
> at
> org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:280)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
> ... 24 more
>
> best,
> amenhub
>
> 发件人: amen...@163.com
> 发送时间: 2020-08-24 20:40
> 收件人: user-zh
> 主题: hive-exec依赖导致hadoop冲突问题
> hi, everyone
>
> 组件版本:flink-1.11.1,hive-2.1.1
>
> 问题描述:
> 使用Table
> API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行;
>
> 当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive
> table(不会发生hadoop依赖冲突);
>
> 但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突;
>
>
> 请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗?
>
> best,
> amenhub
>
 
 
-- 
Best regards!
Rui Li


Re: hive-exec依赖导致hadoop冲突问题

2020-08-24 Thread Rui Li
Hi,

hive-exec本身并不包含Hadoop,如果是因为maven的传递依赖引入的话可以在打包时去掉。运行时使用的Hadoop版本可以用你集群Hadoop版本,而不是hive本身依赖的Hadoop版本。另外对于Flink
1.11也可以考虑使用官方提供的flink-sql-connector-hive Uber
jar,这个jar包含所有hive的依赖(Hadoop的依赖还是需要另外添加)。更详细的信息建议参考文档 [1][2]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#dependencies
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes

On Mon, Aug 24, 2020 at 9:05 PM amen...@163.com  wrote:

>
> 补充一下,当我移除hive-exec等程序中的hadoop依赖时,任务依旧异常,所以也许是我哪个地方没有到位,觉得依赖冲突是因为在测试hive集成之前,我提交过到yarn执行并无异常,所以排查思路来到了hive这里,
> 现在看来,可能是另外某个原因导致的,贴一点点异常栈如下:
>
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not deploy Yarn job cluster.
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
> at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
> at
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
> ... 19 more
> Caused by: java.lang.ClassCastException:
> org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto
> cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy63.getClusterNodes(Unknown Source)
> at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:311)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy64.getClusterNodes(Unknown Source)
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:618)
> at
> org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:280)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
> ... 24 more
>
> best,
> amenhub
>
> 发件人: amen...@163.com
> 发送时间: 2020-08-24 20:40
> 收件人: user-zh
> 主题: hive-exec依赖导致hadoop冲突问题
> hi, everyone
>
> 组件版本:flink-1.11.1,hive-2.1.1
>
> 问题描述:
> 使用Table
> API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行;
>
> 当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive
> table(不会发生hadoop依赖冲突);
>
> 但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突;
>
>
> 请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗?
>
> best,
> amenhub
>


-- 
Best regards!
Rui Li


回复: hive-exec依赖导致hadoop冲突问题

2020-08-24 Thread amen...@163.com
补充一下,当我移除hive-exec等程序中的hadoop依赖时,任务依旧异常,所以也许是我哪个地方没有到位,觉得依赖冲突是因为在测试hive集成之前,我提交过到yarn执行并无异常,所以排查思路来到了hive这里,
现在看来,可能是另外某个原因导致的,贴一点点异常栈如下:

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not deploy Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at 
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
... 19 more
Caused by: java.lang.ClassCastException: 
org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto 
cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy63.getClusterNodes(Unknown Source)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:311)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy64.getClusterNodes(Unknown Source)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:618)
at 
org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
at 
org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:280)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
... 24 more

best,
amenhub
 
发件人: amen...@163.com
发送时间: 2020-08-24 20:40
收件人: user-zh
主题: hive-exec依赖导致hadoop冲突问题
hi, everyone
 
组件版本:flink-1.11.1,hive-2.1.1
 
问题描述:
使用Table 
API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行;
 
当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive 
table(不会发生hadoop依赖冲突);
但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突;
 
请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗?
 
best,
amenhub


  1   2   3   4   >