Re: 配置hadoop依赖问题
Hi fengqi, “Hadoop is not in the classpath/dependencies.”报错说明org.apache.hadoop.conf.Configuration和org.apache.hadoop.fs.FileSystem这些hdfs所需的类没有找到。 如果你的系统环境中有hadoop的话,通常是用这种方式来设置classpath: export HADOOP_CLASSPATH=`hadoop classpath` 如果你的提交方式是提交到本地一个standalone的flink集群的话,可以检查下flink生成的日志文件,里面会打印classpath,可以看下是否有Hadoop相关的class。 Best, Biao Geng ha.fen...@aisino.com 于2024年4月2日周二 10:24写道: > 1、在开发环境下,添加的有hadoop-client依赖,checkpoint时可以访问到hdfs的路径 > 2、flink1.19.0,hadoop3.3.1,jar提交到单机flink系统中,提示如下错误 > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobInitializationException: Could not start > the JobMaster. > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) > Caused by: org.apache.flink.runtime.client.JobInitializationException: > Could not start the JobMaster. > at > org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint > storage at checkpoint coordinator side. > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) > ... 3 more > Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create > checkpoint storage at checkpoint coordinator side. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:364) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:273) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:503) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:334) > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:173) > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:381) > at > org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:224) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:140) > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:162) > at > org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) > at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:356) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > ... 3 more > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not find a file system implementation for scheme '
Re: [DISCUSS] Hadoop 2 vs Hadoop 3 usage
I could share some metrics about Alibaba Cloud EMR clusters. The ratio of Hadoop2 VS Hadoop3 is 1:3. Best, Yang On Thu, Dec 28, 2023 at 8:16 PM Martijn Visser wrote: > Hi all, > > I want to get some insights on how many users are still using Hadoop 2 > vs how many users are using Hadoop 3. Flink currently requires a > minimum version of Hadoop 2.10.2 for certain features, but also > extensively uses Hadoop 3 (like for the file system implementations) > > Hadoop 2 has a large number of direct and indirect vulnerabilities > [1]. Most of them can only be resolved by dropping support for Hadoop > 2 and upgrading to a Hadoop 3 version. This thread is primarily to get > more insights if Hadoop 2 is still commonly used, or if we can > actually discuss dropping support for Hadoop 2 in Flink. > > Best regards, > > Martijn > > [1] > https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common/2.10.2 >
[DISCUSS] Hadoop 2 vs Hadoop 3 usage
Hi all, I want to get some insights on how many users are still using Hadoop 2 vs how many users are using Hadoop 3. Flink currently requires a minimum version of Hadoop 2.10.2 for certain features, but also extensively uses Hadoop 3 (like for the file system implementations) Hadoop 2 has a large number of direct and indirect vulnerabilities [1]. Most of them can only be resolved by dropping support for Hadoop 2 and upgrading to a Hadoop 3 version. This thread is primarily to get more insights if Hadoop 2 is still commonly used, or if we can actually discuss dropping support for Hadoop 2 in Flink. Best regards, Martijn [1] https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common/2.10.2
Re: Hadoop Error on ECS Fargate
Hi Mengxi Wang, Which Flink version are you using? Best regards, Martijn On Thu, Jul 13, 2023 at 3:21 PM Wang, Mengxi X via user < user@flink.apache.org> wrote: > Hi community, > > > > We got this kuerberos error with Hadoop as file system on ECS Fargate > deployment. > > > > Caused by: org.apache.hadoop.security.KerberosAuthException: failure to > login: javax.security.auth.login.LoginException: > java.lang.NullPointerException: invalid null input: name > > > > Caused by: javax.security.auth.login.LoginException: > java.lang.NullPointerException: invalid null input: name > > > > We don’t actually need Kerberos authentication so I’ve added properties to > disable Hadoop Kerberos authentication to flink-config.yaml and I can see > from logs they’ve been picked up. But still the errors persist. Can anybody > help please? > > > > Best wishes, > > Mengxi Wang > > > > This message is confidential and subject to terms at: > https://www.jpmorgan.com/emaildisclaimer including on confidential, > privileged or legal entity information, malicious content and monitoring of > electronic messages. If you are not the intended recipient, please delete > this message and notify the sender immediately. Any unauthorized use is > strictly prohibited. >
Hadoop Error on ECS Fargate
Hi community, We got this kuerberos error with Hadoop as file system on ECS Fargate deployment. Caused by: org.apache.hadoop.security.KerberosAuthException: failure to login: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name Caused by: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name We don't actually need Kerberos authentication so I've added properties to disable Hadoop Kerberos authentication to flink-config.yaml and I can see from logs they've been picked up. But still the errors persist. Can anybody help please? Best wishes, Mengxi Wang This message is confidential and subject to terms at: https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged or legal entity information, malicious content and monitoring of electronic messages. If you are not the intended recipient, please delete this message and notify the sender immediately. Any unauthorized use is strictly prohibited.
Re: fail to mount hadoop-config-volume when using flink-k8s-operator
Currently, exporting the env "HADOOP_CONF_DIR" could only work for native K8s integration. The flink client will try to create the hadoop-config-volume automatically if hadoop env found. If you want to set the HADOOP_CONF_DIR in the docker image, please also make sure the specified hadoop conf directory exists in the image. For flink-k8s-operator, another feasible solution is to create a hadoop-config-configmap manually and then use *"kubernetes.hadoop.conf.config-map.name <http://kubernetes.hadoop.conf.config-map.name>" *to mount it to JobManager and TaskManager pods. Best, Yang Liting Liu (litiliu) 于2022年10月12日周三 16:11写道: > Hi, community: > I'm using flink-k8s-operator v1.2.0 to deploy flink job. And the > "HADOOP_CONF_DIR" environment variable was setted in the image that i > buiilded from flink:1.15. I found the taskmanager pod was trying to mount > a volume named "hadoop-config-volume" from configMap. But the configMap > with the name "hadoop-config-volume" was't created. > > Do i need to remove the "HADOOP_CONF_DIR" environment variable in > dockerfile? > If yes, what should i do to specify the hadoop conf? > >
fail to mount hadoop-config-volume when using flink-k8s-operator
Hi, community: I'm using flink-k8s-operator v1.2.0 to deploy flink job. And the "HADOOP_CONF_DIR" environment variable was setted in the image that i buiilded from flink:1.15. I found the taskmanager pod was trying to mount a volume named "hadoop-config-volume" from configMap. But the configMap with the name "hadoop-config-volume" was't created. Do i need to remove the "HADOOP_CONF_DIR" environment variable in dockerfile? If yes, what should i do to specify the hadoop conf?
Setting boundedness for legacy Hadoop sequence file sources
Hi all, I’m converting several batch Flink workflows to streaming, with bounded sources. Some of our sources are reading Hadoop sequence files via StreamExecutionEnvironment.createInput(HadoopInputFormat). The problem is that StreamGraphGenerator.existsUnboundedSource is returning true, because the LegacySourceTransformation for this source says it’s CONTINUOUS_UNBOUNDED. So the workflow fails to run, because I’ve set the execution mode to batch. The root cause is that StreamExecutionEnvironment.createInput() checks if the input format extends FileInputFormat, and only sets up a bounded source if it does. HadoopInputFormat doesn’t extend FileInputFormat, so boundedness gets set to CONTINUOUS_UNBOUNDED, which is wrong. This looks like a bug in StreamExecutionEnvironment.createInput(), though not sure how best to fix it. Relying on class checks feels brittle. Regards, — Ken -- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch
Re: [statefun] hadoop dependencies and StatefulFunctionsConfigValidator
Hi Fil, I've replied in the JIRA Cheers, Igal. On Tue, Mar 8, 2022 at 6:08 PM Filip Karnicki wrote: > Hi Roman, Igal (@ below) > > Thank you for your answer. I don't think I'll have access to flink's lib > folder given it's a shared Cloudera cluster. The only thing I could think > of is to not include com.google.protobuf in the > classloader.parent-first-patterns.additional setting, and > including protobuf-java 3.7.1 in the uber jar. > > I created a jira for this just now + a discuss thread on the dev group > https://issues.apache.org/jira/browse/FLINK-26537 > > Hi @Igal Shilman , is the plugin solution > outlined by Roman something that fits in better with Statefun than having > the creators of uber .jars be responsible for using a statefun-compatible > protobuf-java? > > Kind regards > Fil > > On Tue, 8 Mar 2022 at 14:02, Roman Khachatryan wrote: > >> Hi Filip, >> >> Have you tried putting protobuf-java 3.7.1 into the Flink's lib/ folder? >> Or maybe re-writing the dependencies you mentioned to be loaded as >> plugins? [1] >> >> I don't see any other ways to solve this problem. >> Probably Chesnay or Seth will suggest a better solution. >> >> [1] >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/ >> >> >> Regards, >> Roman >> >> On Fri, Mar 4, 2022 at 9:54 AM Filip Karnicki >> wrote: >> > >> > Hi All! >> > >> > We're running a statefun uber jar on a shared cloudera flink cluster, >> the latter of which launches with some ancient protobuf dependencies >> because of reasons[1]. >> > >> > Setting the following flink-config settings on the entire cluster >> > >> > classloader.parent-first-patterns.additional: >> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf >> > >> > causes these old protobuf dependencies to get loaded over statefun's >> protobuf-java 3.7.1, and NoSuchMethod exceptions occur. >> > >> > We hacked together a version of statefun that doesn't perform the check >> whether the classloader settings contain the three patterns from above, and >> as long as our job uses protobouf-java 3.7.1 and the com.google.protobuf >> pattern is not present in the classloader.parent-first-patterns.additional >> setting, then all is well. >> > >> > Aside from removing old hadoop from the classpath, which may not be >> possible given that it's a shared cluster, is there anything we can do >> other than adding a configurable override not to perform the config check >> in StatefulFunctionsConfigValidator to an upcoming statefun core release? >> > >> > Many thanks >> > Fil >> > >> > >> > [1] We're still trying to find out if it's absolutely necessary to have >> these on the classpath. >> >
Re: [statefun] hadoop dependencies and StatefulFunctionsConfigValidator
Hi Roman, Igal (@ below) Thank you for your answer. I don't think I'll have access to flink's lib folder given it's a shared Cloudera cluster. The only thing I could think of is to not include com.google.protobuf in the classloader.parent-first-patterns.additional setting, and including protobuf-java 3.7.1 in the uber jar. I created a jira for this just now + a discuss thread on the dev group https://issues.apache.org/jira/browse/FLINK-26537 Hi @Igal Shilman , is the plugin solution outlined by Roman something that fits in better with Statefun than having the creators of uber .jars be responsible for using a statefun-compatible protobuf-java? Kind regards Fil On Tue, 8 Mar 2022 at 14:02, Roman Khachatryan wrote: > Hi Filip, > > Have you tried putting protobuf-java 3.7.1 into the Flink's lib/ folder? > Or maybe re-writing the dependencies you mentioned to be loaded as > plugins? [1] > > I don't see any other ways to solve this problem. > Probably Chesnay or Seth will suggest a better solution. > > [1] > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/ > > > Regards, > Roman > > On Fri, Mar 4, 2022 at 9:54 AM Filip Karnicki > wrote: > > > > Hi All! > > > > We're running a statefun uber jar on a shared cloudera flink cluster, > the latter of which launches with some ancient protobuf dependencies > because of reasons[1]. > > > > Setting the following flink-config settings on the entire cluster > > > > classloader.parent-first-patterns.additional: > org.apache.flink.statefun;org.apache.kafka;com.google.protobuf > > > > causes these old protobuf dependencies to get loaded over statefun's > protobuf-java 3.7.1, and NoSuchMethod exceptions occur. > > > > We hacked together a version of statefun that doesn't perform the check > whether the classloader settings contain the three patterns from above, and > as long as our job uses protobouf-java 3.7.1 and the com.google.protobuf > pattern is not present in the classloader.parent-first-patterns.additional > setting, then all is well. > > > > Aside from removing old hadoop from the classpath, which may not be > possible given that it's a shared cluster, is there anything we can do > other than adding a configurable override not to perform the config check > in StatefulFunctionsConfigValidator to an upcoming statefun core release? > > > > Many thanks > > Fil > > > > > > [1] We're still trying to find out if it's absolutely necessary to have > these on the classpath. >
Re: [statefun] hadoop dependencies and StatefulFunctionsConfigValidator
Hi Filip, Have you tried putting protobuf-java 3.7.1 into the Flink's lib/ folder? Or maybe re-writing the dependencies you mentioned to be loaded as plugins? [1] I don't see any other ways to solve this problem. Probably Chesnay or Seth will suggest a better solution. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/ Regards, Roman On Fri, Mar 4, 2022 at 9:54 AM Filip Karnicki wrote: > > Hi All! > > We're running a statefun uber jar on a shared cloudera flink cluster, the > latter of which launches with some ancient protobuf dependencies because of > reasons[1]. > > Setting the following flink-config settings on the entire cluster > > classloader.parent-first-patterns.additional: > org.apache.flink.statefun;org.apache.kafka;com.google.protobuf > > causes these old protobuf dependencies to get loaded over statefun's > protobuf-java 3.7.1, and NoSuchMethod exceptions occur. > > We hacked together a version of statefun that doesn't perform the check > whether the classloader settings contain the three patterns from above, and > as long as our job uses protobouf-java 3.7.1 and the com.google.protobuf > pattern is not present in the classloader.parent-first-patterns.additional > setting, then all is well. > > Aside from removing old hadoop from the classpath, which may not be possible > given that it's a shared cluster, is there anything we can do other than > adding a configurable override not to perform the config check in > StatefulFunctionsConfigValidator to an upcoming statefun core release? > > Many thanks > Fil > > > [1] We're still trying to find out if it's absolutely necessary to have these > on the classpath.
[statefun] hadoop dependencies and StatefulFunctionsConfigValidator
Hi All! We're running a statefun uber jar on a shared cloudera flink cluster, the latter of which launches with some ancient protobuf dependencies because of reasons[1]. Setting the following flink-config settings on the entire cluster classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf causes these old protobuf dependencies to get loaded over statefun's protobuf-java 3.7.1, and NoSuchMethod exceptions occur. We hacked together a version of statefun that doesn't perform the check whether the classloader settings contain the three patterns from above, and as long as our job uses protobouf-java 3.7.1 and the com.google.protobuf pattern is not present in the classloader.parent-first-patterns.additional setting, then all is well. Aside from removing old hadoop from the classpath, which may not be possible given that it's a shared cluster, is there anything we can do other than adding a configurable override not to perform the config check in StatefulFunctionsConfigValidator to an upcoming statefun core release? Many thanks Fil [1] We're still trying to find out if it's absolutely necessary to have these on the classpath.
Re: [DISCUSS] Changing the minimal supported version of Hadoop
As there were no strong objections, we'll proceed with bumping the Hadoop version to 2.8.5 and removing the safeguards and the CI for any earlier versions. This will effectively make the Hadoop 2.8.5 the least supported version in Flink 1.15. Best, D. On Thu, Dec 23, 2021 at 11:03 AM Till Rohrmann wrote: > If there are no users strongly objecting to dropping Hadoop support for < > 2.8, then I am +1 for this since otherwise we won't gain a lot as Xintong > said. > > Cheers, > Till > > On Wed, Dec 22, 2021 at 10:33 AM David Morávek wrote: > > > Agreed, if we drop the CI for lower versions, there is actually no point > > of having safeguards as we can't really test for them. > > > > Maybe one more thought (it's more of a feeling), I feel that users > running > > really old Hadoop versions are usually slower to adopt (they most likely > > use what the current HDP / CDH version they use offers) and they are less > > likely to use Flink 1.15 any time soon, but I don't have any strong data > to > > support this. > > > > D. > > >
Re: [DISCUSS] Changing the minimal supported version of Hadoop
If there are no users strongly objecting to dropping Hadoop support for < 2.8, then I am +1 for this since otherwise we won't gain a lot as Xintong said. Cheers, Till On Wed, Dec 22, 2021 at 10:33 AM David Morávek wrote: > Agreed, if we drop the CI for lower versions, there is actually no point > of having safeguards as we can't really test for them. > > Maybe one more thought (it's more of a feeling), I feel that users running > really old Hadoop versions are usually slower to adopt (they most likely > use what the current HDP / CDH version they use offers) and they are less > likely to use Flink 1.15 any time soon, but I don't have any strong data to > support this. > > D. >
Re: [DISCUSS] Changing the minimal supported version of Hadoop
Agreed, if we drop the CI for lower versions, there is actually no point of having safeguards as we can't really test for them. Maybe one more thought (it's more of a feeling), I feel that users running really old Hadoop versions are usually slower to adopt (they most likely use what the current HDP / CDH version they use offers) and they are less likely to use Flink 1.15 any time soon, but I don't have any strong data to support this. D.
Re: [DISCUSS] Changing the minimal supported version of Hadoop
Sorry to join the discussion late. +1 for dropping support for hadoop versions < 2.8 from my side. TBH, warping the reflection based logic with safeguards sounds a bit neither fish nor fowl to me. It weakens the major benefits that we look for by dropping support for early versions. - The codebase is simplified, but not significantly. We still have the complexity of understanding which APIs may not exist in early versions. - Without CI, we provide no guarantee that Flink will still work with early hadoop versions. Or otherwise we fail to simplify the CI. I'd suggest to say we no longer support hadoop versions < 2.8 at all. And if that is not permitted by our users, we may consider to keep the codebase as is and wait for a bit longer. WDYT? Thank you~ Xintong Song [1] https://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-common/Compatibility.html#Wire_compatibility On Wed, Dec 22, 2021 at 12:52 AM David Morávek wrote: > CC user@f.a.o > > Is anyone aware of something that blocks us from doing the upgrade? > > D. > > On Tue, Dec 21, 2021 at 5:50 PM David Morávek > wrote: > >> Hi Martijn, >> >> from person experience, most Hadoop users are lagging behind the release >> lines by a lot, because upgrading a Hadoop cluster is not really a simply >> task to achieve. I think for now, we can stay a bit conservative, nothing >> blocks us for using 2.8.5 as we don't use any "newer" APIs in the code. >> >> As for Till's concern, we can still wrap the reflection based logic, to >> be skipped in case of "NoClassDefFound" instead of "ClassNotFound" as we do >> now. >> >> D. >> >> >> On Tue, Dec 14, 2021 at 5:23 PM Martijn Visser >> wrote: >> >>> Hi David, >>> >>> Thanks for bringing this up for discussion! Given that Hadoop 2.8 is >>> considered EOL, shouldn't we bump the version to Hadoop 2.10? [1] >>> >>> Best regards, >>> >>> Martijn >>> >>> [1] >>> >>> https://cwiki.apache.org/confluence/display/HADOOP/Hadoop+Active+Release+Lines >>> >>> On Tue, 14 Dec 2021 at 10:28, Till Rohrmann >>> wrote: >>> >>> > Hi David, >>> > >>> > I think we haven't updated our Hadoop dependencies in a long time. >>> Hence, >>> > it is probably time to do so. So +1 for upgrading to the latest patch >>> > release. >>> > >>> > If newer 2.x Hadoop versions are compatible with 2.y with x >= y, then >>> I >>> > don't see a problem with dropping support for pre-bundled Hadoop >>> versions < >>> > 2.8. This could indeed help us decrease our build matrix a bit and, >>> thus, >>> > saving some build time. >>> > >>> > Concerning simplifying our code base to get rid of reflection logic >>> etc. we >>> > still might have to add a safeguard for features that are not >>> supported by >>> > earlier versions. According to the docs >>> > >>> > > YARN applications that attempt to use new APIs (including new fields >>> in >>> > data structures) that have not yet been deployed to the cluster can >>> expect >>> > link exceptions >>> > >>> > we can see link exceptions. We could get around this by saying that >>> Flink >>> > no longer supports Hadoop < 2.8. But this should be checked with our >>> users >>> > on the user ML at least. >>> > >>> > Cheers, >>> > Till >>> > >>> > On Tue, Dec 14, 2021 at 9:25 AM David Morávek wrote: >>> > >>> > > Hi, >>> > > >>> > > I'd like to start a discussion about upgrading a minimal Hadoop >>> version >>> > > that Flink supports. >>> > > >>> > > Even though the default value for `hadoop.version` property is set to >>> > > 2.8.3, we're still ensuring both runtime and compile compatibility >>> with >>> > > Hadoop 2.4.x with the scheduled pipeline[1]. >>> > > >>> > > Here is list of dates of the latest releases for each minor version >>> up to >>> > > 2.8.x >>> > > >>> > > - Hadoop 2.4.1: Last commit on 6/30/2014 >>> > > - Hadoop 2.5.2: Last commit on 11/15/2014 >>> > > - Hadoop 2.6.5: Last commit on 10/11/2016 >>> > > - Hadoop 2.7.7: Last commit on 7/18/2018 >>> > > - Hadoop 2.8.5: Last commit on
Re: [DISCUSS] Changing the minimal supported version of Hadoop
CC user@f.a.o Is anyone aware of something that blocks us from doing the upgrade? D. On Tue, Dec 21, 2021 at 5:50 PM David Morávek wrote: > Hi Martijn, > > from person experience, most Hadoop users are lagging behind the release > lines by a lot, because upgrading a Hadoop cluster is not really a simply > task to achieve. I think for now, we can stay a bit conservative, nothing > blocks us for using 2.8.5 as we don't use any "newer" APIs in the code. > > As for Till's concern, we can still wrap the reflection based logic, to be > skipped in case of "NoClassDefFound" instead of "ClassNotFound" as we do > now. > > D. > > > On Tue, Dec 14, 2021 at 5:23 PM Martijn Visser > wrote: > >> Hi David, >> >> Thanks for bringing this up for discussion! Given that Hadoop 2.8 is >> considered EOL, shouldn't we bump the version to Hadoop 2.10? [1] >> >> Best regards, >> >> Martijn >> >> [1] >> >> https://cwiki.apache.org/confluence/display/HADOOP/Hadoop+Active+Release+Lines >> >> On Tue, 14 Dec 2021 at 10:28, Till Rohrmann wrote: >> >> > Hi David, >> > >> > I think we haven't updated our Hadoop dependencies in a long time. >> Hence, >> > it is probably time to do so. So +1 for upgrading to the latest patch >> > release. >> > >> > If newer 2.x Hadoop versions are compatible with 2.y with x >= y, then I >> > don't see a problem with dropping support for pre-bundled Hadoop >> versions < >> > 2.8. This could indeed help us decrease our build matrix a bit and, >> thus, >> > saving some build time. >> > >> > Concerning simplifying our code base to get rid of reflection logic >> etc. we >> > still might have to add a safeguard for features that are not supported >> by >> > earlier versions. According to the docs >> > >> > > YARN applications that attempt to use new APIs (including new fields >> in >> > data structures) that have not yet been deployed to the cluster can >> expect >> > link exceptions >> > >> > we can see link exceptions. We could get around this by saying that >> Flink >> > no longer supports Hadoop < 2.8. But this should be checked with our >> users >> > on the user ML at least. >> > >> > Cheers, >> > Till >> > >> > On Tue, Dec 14, 2021 at 9:25 AM David Morávek wrote: >> > >> > > Hi, >> > > >> > > I'd like to start a discussion about upgrading a minimal Hadoop >> version >> > > that Flink supports. >> > > >> > > Even though the default value for `hadoop.version` property is set to >> > > 2.8.3, we're still ensuring both runtime and compile compatibility >> with >> > > Hadoop 2.4.x with the scheduled pipeline[1]. >> > > >> > > Here is list of dates of the latest releases for each minor version >> up to >> > > 2.8.x >> > > >> > > - Hadoop 2.4.1: Last commit on 6/30/2014 >> > > - Hadoop 2.5.2: Last commit on 11/15/2014 >> > > - Hadoop 2.6.5: Last commit on 10/11/2016 >> > > - Hadoop 2.7.7: Last commit on 7/18/2018 >> > > - Hadoop 2.8.5: Last commit on 9/8/2018 >> > > >> > > Since then there were two more minor releases in 2.x branch and four >> more >> > > minor releases in 3.x branch. >> > > >> > > Supporting the older version involves reflection-based "hacks" for >> > > supporting multiple versions. >> > > >> > > My proposal would be changing the minimum supported version *to >> 2.8.5*. >> > > This should simplify the hadoop related codebase and simplify the CI >> > build >> > > infrastructure as we won't have to test for the older versions. >> > > >> > > Please note that this only involves a minimal *client side* >> > compatibility. >> > > The wire protocol should remain compatible with earlier versions [2], >> so >> > we >> > > should be able to talk with any servers in 2.x major branch. >> > > >> > > One small note for the 2.8.x branch, some of the classes we need are >> only >> > > available in 2.8.4 version and above, but I'm not sure we should take >> an >> > > eventual need for upgrading a patch version into consideration here, >> > > because both 2.8.4 and 2.8.5 are pretty old. >> > > >> > > WDYT, is it already time to upgrade? Looking forward for any thoughts >> on >> > > the topic! >> > > >> > > [1] >> > > >> > > >> > >> https://github.com/apache/flink/blob/release-1.14.0/tools/azure-pipelines/build-apache-repo.yml#L123 >> > > [2] >> > > >> > > >> > >> https://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-common/Compatibility.html#Wire_compatibility >> > > >> > > Best, >> > > D. >> > > >> > >> >
Re: Passing arbitrary Hadoop s3a properties from FileSystem SQL Connector options
Hi Timothy, The issue would require a refactor FileSystems abstraction to allow multiple FileSystems objects of the same FileSystem type being configured differently. While this would improve the code quality and enable such use cases, I currently have no capacity to work on it or guide it. If you are interested in working on it, I can try to find someone to shepherd. On Mon, Dec 13, 2021 at 7:13 PM Timothy James wrote: > Thank you Timo. Hi Arvid! > > I note that that ticket proposes two alternatives for solution. The first > > > Either we allow a properties map similar to Kafka or Kinesis properties > to our connectors. > > seems to solve our problem. The second, much more detailed, appears > unrelated to our needs: > > > Or something like: > > Management of two properties related S3 Object management: > > ... > > The ticket is unassigned and has been open for more than a year. It looks > like you increased the ticket priority, thank you. > > Tim > > On Mon, Dec 13, 2021 at 6:52 AM Timo Walther wrote: > >> Hi Timothy, >> >> unfortunetaly, this is not supported yet. However, the effort will be >> tracked under the following ticket: >> >> https://issues.apache.org/jira/browse/FLINK-19589 >> >> I will loop-in Arvid (in CC) which might help you in contributing the >> missing functioniality. >> >> Regards, >> Timo >> >> >> On 10.12.21 23:48, Timothy James wrote: >> > Hi, >> > >> > The Hadoop s3a library itself supports some properties we need, but the >> > "FileSystem SQL Connector" (via FileSystemTableFactory) does not pass >> > connector options for these to the "Hadoop/Presto S3 File Systems >> > plugins" (via S3FileSystemFactory). >> > >> > Instead, only Job-global Flink config values are passed to Hadoop s3a. >> > That won't work for us: we need to vary these values per Flink SQL >> > table, and not override our config for other use of S3 (such as Flink >> > checkpointing). >> > >> > Contrast this with the Kafka connector, which supports an analogous >> > "properties.*" prefixed pass-through mechanism, and the Kinesis >> > connector, which supports all the specific properties we would need out >> > of the box. >> > >> > Our current intent is to alter FileSystemTableFactory to follow the >> > "properties.*" approach used by the Kafka connector. >> > >> > *** ➡️ Our questions for you: ⬅️ >> > - Know of anything like this? Anybody solved this? >> > - Know of anything that's going to break this approach? >> > - What are we missing? >> > >> > For context, our particular use case requires options like: >> > - fs.s3a.assumed.role.arn >> > - fs.s3a.aws.credentials.provider, (or some other mechanism to pass >> > externalId) >> > >> > We imagine there would be other use cases for this, and if we build it >> > ourselves there's the possibility of contributing it to the Flink repo >> > for everybody. >> > >> > Relevant documentation: >> > - >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/ >> > < >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/ >> > >> > - >> > >> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins >> > < >> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins >> > >> > - >> > >> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html >> > < >> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html >> > >> > - >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties >> > < >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties >> > >> > - >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid >> > < >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid >> > >> > >> > Thank you! >> > >> > Tim James >> > Decodable.co >> > >> >>
Re: Passing arbitrary Hadoop s3a properties from FileSystem SQL Connector options
Thank you Timo. Hi Arvid! I note that that ticket proposes two alternatives for solution. The first > Either we allow a properties map similar to Kafka or Kinesis properties to our connectors. seems to solve our problem. The second, much more detailed, appears unrelated to our needs: > Or something like: > Management of two properties related S3 Object management: > ... The ticket is unassigned and has been open for more than a year. It looks like you increased the ticket priority, thank you. Tim On Mon, Dec 13, 2021 at 6:52 AM Timo Walther wrote: > Hi Timothy, > > unfortunetaly, this is not supported yet. However, the effort will be > tracked under the following ticket: > > https://issues.apache.org/jira/browse/FLINK-19589 > > I will loop-in Arvid (in CC) which might help you in contributing the > missing functioniality. > > Regards, > Timo > > > On 10.12.21 23:48, Timothy James wrote: > > Hi, > > > > The Hadoop s3a library itself supports some properties we need, but the > > "FileSystem SQL Connector" (via FileSystemTableFactory) does not pass > > connector options for these to the "Hadoop/Presto S3 File Systems > > plugins" (via S3FileSystemFactory). > > > > Instead, only Job-global Flink config values are passed to Hadoop s3a. > > That won't work for us: we need to vary these values per Flink SQL > > table, and not override our config for other use of S3 (such as Flink > > checkpointing). > > > > Contrast this with the Kafka connector, which supports an analogous > > "properties.*" prefixed pass-through mechanism, and the Kinesis > > connector, which supports all the specific properties we would need out > > of the box. > > > > Our current intent is to alter FileSystemTableFactory to follow the > > "properties.*" approach used by the Kafka connector. > > > > *** ➡️ Our questions for you: ⬅️ > > - Know of anything like this? Anybody solved this? > > - Know of anything that's going to break this approach? > > - What are we missing? > > > > For context, our particular use case requires options like: > > - fs.s3a.assumed.role.arn > > - fs.s3a.aws.credentials.provider, (or some other mechanism to pass > > externalId) > > > > We imagine there would be other use cases for this, and if we build it > > ourselves there's the possibility of contributing it to the Flink repo > > for everybody. > > > > Relevant documentation: > > - > > > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/ > > < > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/ > > > > - > > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins > > < > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins > > > > - > > > https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html > > < > https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html > > > > - > > > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties > > < > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties > > > > - > > > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid > > < > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid > > > > > > Thank you! > > > > Tim James > > Decodable.co > > > >
Re: Passing arbitrary Hadoop s3a properties from FileSystem SQL Connector options
Hi Timothy, unfortunetaly, this is not supported yet. However, the effort will be tracked under the following ticket: https://issues.apache.org/jira/browse/FLINK-19589 I will loop-in Arvid (in CC) which might help you in contributing the missing functioniality. Regards, Timo On 10.12.21 23:48, Timothy James wrote: Hi, The Hadoop s3a library itself supports some properties we need, but the "FileSystem SQL Connector" (via FileSystemTableFactory) does not pass connector options for these to the "Hadoop/Presto S3 File Systems plugins" (via S3FileSystemFactory). Instead, only Job-global Flink config values are passed to Hadoop s3a. That won't work for us: we need to vary these values per Flink SQL table, and not override our config for other use of S3 (such as Flink checkpointing). Contrast this with the Kafka connector, which supports an analogous "properties.*" prefixed pass-through mechanism, and the Kinesis connector, which supports all the specific properties we would need out of the box. Our current intent is to alter FileSystemTableFactory to follow the "properties.*" approach used by the Kafka connector. *** ➡️ Our questions for you: ⬅️ - Know of anything like this? Anybody solved this? - Know of anything that's going to break this approach? - What are we missing? For context, our particular use case requires options like: - fs.s3a.assumed.role.arn - fs.s3a.aws.credentials.provider, (or some other mechanism to pass externalId) We imagine there would be other use cases for this, and if we build it ourselves there's the possibility of contributing it to the Flink repo for everybody. Relevant documentation: - https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/ <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/> - https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins> - https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html <https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html> - https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties> - https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid> Thank you! Tim James Decodable.co
Passing arbitrary Hadoop s3a properties from FileSystem SQL Connector options
Hi, The Hadoop s3a library itself supports some properties we need, but the "FileSystem SQL Connector" (via FileSystemTableFactory) does not pass connector options for these to the "Hadoop/Presto S3 File Systems plugins" (via S3FileSystemFactory). Instead, only Job-global Flink config values are passed to Hadoop s3a. That won't work for us: we need to vary these values per Flink SQL table, and not override our config for other use of S3 (such as Flink checkpointing). Contrast this with the Kafka connector, which supports an analogous "properties.*" prefixed pass-through mechanism, and the Kinesis connector, which supports all the specific properties we would need out of the box. Our current intent is to alter FileSystemTableFactory to follow the "properties.*" approach used by the Kafka connector. *** ➡️ Our questions for you: ⬅️ - Know of anything like this? Anybody solved this? - Know of anything that's going to break this approach? - What are we missing? For context, our particular use case requires options like: - fs.s3a.assumed.role.arn - fs.s3a.aws.credentials.provider, (or some other mechanism to pass externalId) We imagine there would be other use cases for this, and if we build it ourselves there's the possibility of contributing it to the Flink repo for everybody. Relevant documentation: - https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/ - https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins - https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html - https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#properties - https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kinesis/#aws-credentials-role-externalid Thank you! Tim James Decodable.co
Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
Hey Ingo, Thanks for the suggestion. It's definitely an issue with the Parquet connector, when we try with the CSV or Blackhole connector it's all fine. I will be trying this approach and report back. Thanks, Natu On Wed, Dec 8, 2021 at 7:02 PM Ingo Bürk wrote: > Hi Natu, > > Something you could try is removing the packaged parquet format and > defining a custom format[1]. For this custom format you can then fix the > dependencies by packaging all of the following into the format: > > * flink-sql-parquet > * flink-shaded-hadoop-2-uber > * hadoop-aws > * aws-java-sdk-bundle > * guava > > This isn't entirely straight-forward, unfortunately, and I haven't > verified it. However, with Ververica Platform 2.6, to be released shortly > after Flink 1.15, it should also work again. > > [1] > https://docs.ververica.com/user_guide/sql_development/connectors.html#custom-connectors-and-formats > > > Best > Ingo > > On Tue, Dec 7, 2021 at 6:23 AM Natu Lauchande > wrote: > >> Hey Timo and Flink community, >> >> I wonder if there is a fix for this issue. The last time I rollbacked to >> version 12 of Flink and downgraded Ververica. >> >> I am really keen to leverage the new features on the latest versions of >> Ververica 2.5+ , i have tried a myriad of tricks suggested ( example : >> building the image with hadoop-client libraries) : >> >> java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration >> at java.lang.Class.getDeclaredConstructors0(Native Method) >> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) >> at java.lang.Class.getDeclaredConstructors(Class.java:2020) >> at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass >> .java:1961) >> at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79) >> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275) >> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273) >> at java.security.AccessController.doPrivileged(Native Method) >> at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass >> .java:272) >> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694) >> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java: >> 2003) >> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java: >> 1850) >> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream >> .java:2160) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream >> .java:2405) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: >> 2329) >> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream >> .java:2187) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream >> .java:2405) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: >> 2329) >> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream >> .java:2187) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream >> .java:2405) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: >> 2329) >> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream >> .java:2187) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream >> .java:2405) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: >> 2329) >> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream >> .java:2187) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream >> .java:2405) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: >> 2329) >> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream >> .java:2187) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) >> at org.apache.flink.util.InstantiationUtil.deserializeObject( >> InstantiationUtil.java:615) >> at org.a
Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
Hi Natu, Something you could try is removing the packaged parquet format and defining a custom format[1]. For this custom format you can then fix the dependencies by packaging all of the following into the format: * flink-sql-parquet * flink-shaded-hadoop-2-uber * hadoop-aws * aws-java-sdk-bundle * guava This isn't entirely straight-forward, unfortunately, and I haven't verified it. However, with Ververica Platform 2.6, to be released shortly after Flink 1.15, it should also work again. [1] https://docs.ververica.com/user_guide/sql_development/connectors.html#custom-connectors-and-formats Best Ingo On Tue, Dec 7, 2021 at 6:23 AM Natu Lauchande wrote: > Hey Timo and Flink community, > > I wonder if there is a fix for this issue. The last time I rollbacked to > version 12 of Flink and downgraded Ververica. > > I am really keen to leverage the new features on the latest versions of > Ververica 2.5+ , i have tried a myriad of tricks suggested ( example : > building the image with hadoop-client libraries) : > > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration > at java.lang.Class.getDeclaredConstructors0(Native Method) > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) > at java.lang.Class.getDeclaredConstructors(Class.java:2020) > at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass > .java:1961) > at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79) > at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275) > at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273) > at java.security.AccessController.doPrivileged(Native Method) > at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass > .java:272) > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694) > at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java: > 2003) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850 > ) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream > .java:2160) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: > 2405) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: > 2329) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream > .java:2187) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: > 2405) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: > 2329) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream > .java:2187) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: > 2405) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: > 2329) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream > .java:2187) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: > 2405) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: > 2329) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream > .java:2187) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: > 2405) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: > 2329) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream > .java:2187) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:615) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:600) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:587) > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( > InstantiationUtil.java:541) > at org.apache.flink.streaming.api.graph.StreamConfig > .getStreamOperatorFactory(StreamConfig.java:322) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.( > OperatorChain.java:159) > at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore( > StreamTask.java:551) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .runWithCleanUpOnFail(StreamTa
Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
Hey Timo and Flink community, I wonder if there is a fix for this issue. The last time I rollbacked to version 12 of Flink and downgraded Ververica. I am really keen to leverage the new features on the latest versions of Ververica 2.5+ , i have tried a myriad of tricks suggested ( example : building the image with hadoop-client libraries) : java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at java.lang.Class.getDeclaredConstructors0(Native Method) at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) at java.lang.Class.getDeclaredConstructors(Class.java:2020) at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java: 1961) at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java: 272) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java: 2003) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2160) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.flink.util.InstantiationUtil.deserializeObject( InstantiationUtil.java:615) at org.apache.flink.util.InstantiationUtil.deserializeObject( InstantiationUtil.java:600) at org.apache.flink.util.InstantiationUtil.deserializeObject( InstantiationUtil.java:587) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( InstantiationUtil.java:541) at org.apache.flink.streaming.api.graph.StreamConfig .getStreamOperatorFactory(StreamConfig.java:322) at org.apache.flink.streaming.runtime.tasks.OperatorChain.( OperatorChain.java:159) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore( StreamTask.java:551) at org.apache.flink.streaming.runtime.tasks.StreamTask .runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore( StreamTask.java:540) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf. Configuration at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at org.apache.flink.util.FlinkUserCodeClassLoader .loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) at org.apache.flink.util.ChildFirstClassLoader .loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass( FlinkUserCodeClassLoader.java:48) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 48 more This error occurs when writing to StreamFileWriting on S3 in Parquet format. Thanks, Natu On Thu, Jul 22, 2021 at 3:53 PM Timo Walther wrote: > Thanks, this should definit
Re: Replacing S3 Client in Hadoop plugin
Hi Tamir, Thanks for providing the information. I don't know of a current solution right now, perhaps some other user has an idea, but I do find your input valuable for future improvements with regards to the S3 Client in Hadoop. Best regards, Martijn On Fri, 19 Nov 2021 at 09:21, Tamir Sagi wrote: > Hey Martijn, > > sorry for late respond. > > We wanted to replace the default client with our custom S3 client and not > use the AmazonS3Client provided by the plugin. > > We used Flink-s3-fs-hadoop v1.12.2 and for our needs we had to upgrade to > v1.14.0 [1]. > > AmazonS3 client factory is initialized[2] - if the property > "fs.s3a.s3.client.factory.impl" [3] is not provided the default factory is > created [4] which provides AmazonS3Client - which does not support what we > need. > I know that both the property and the factory interface are annotated with > > @InterfaceAudience.Private > @InterfaceStability.Unstable > from very early version. > > but we found this solution cleaner than extend the whole class and > override the #setAmazonS3Client method. > > Bottom line, all we had to do was to create our own implementation for > S3ClientFactory interface [5] > and add to flink-conf.yaml : s3.s3.client.factory.impl: canonical name> . > place both the plugin and our artifact(with Factory and client impl) under > ${FLINK_HOME}/plugins/s3 > > One important note: Flink-s3-fs-hadoop plugin includes the whole > com.amazonaws.s3 source code, to avoid plugin class loader issues, we > needed to remove the aws-s3-java-sdk dependency and provide the plugin > dependency with scope "provided". > If the jobs needs to do some work with S3,then shading com.amazonaws was > also necessary. > > [1] > https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop/1.14.0 > > [2] > https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L264-L266 > > [3] > https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java#L366-L369 > > [4] > https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java#L66 > > [5] > https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java > > Best, > Tamir. > > -- > *From:* Martijn Visser > *Sent:* Wednesday, October 13, 2021 8:28 PM > *To:* Tamir Sagi ; user@flink.apache.org < > user@flink.apache.org> > *Subject:* Re: Replacing S3 Client in Hadoop plugin > > > *EXTERNAL EMAIL* > > > Hi, > > Could you elaborate on why you would like to replace the S3 client? > > Best regards, > > Martijn > > On Wed, 13 Oct 2021 at 17:18, Tamir Sagi > wrote: > > I found the dependency > > > org.apache.hadoop > hadoop-aws > 3.3.1 > > > apparently its possible, there is a method > setAmazonS3Client > > I think I found the solution. > > Thanks. > > Tamir. > > -- > *From:* Tamir Sagi > *Sent:* Wednesday, October 13, 2021 5:44 PM > *To:* user@flink.apache.org > *Subject:* Replacing S3 Client in Hadoop plugin > > Hey community. > > I would like to know if there is any way to replace the S3 client in > Hadoop plugin[1] to a custom client(AmazonS3). > > I did notice that Hadoop plugin supports replacing the implementation of > S3AFileSystem using > "fs.s3a.impl" (in flink-conf.yaml it will be "s3.impl") but not the client > itself [2] > > > fs.s3a.impl > org.apache.hadoop.fs.s3a.S3AFileSystem > The implementation class of the S3A Filesystem > > > I delved into Hadoop plugin source code [3] , the Client itself is of type > AmazonS3Client and cannot be replaced (for example) with a client of > type AmazonS3EncryptionV2. > > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins > Amazon S3 | Apache Flink > <https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins> > Entropy injection for S3 file systems # The bundled S3 file systems > (flink-s3-fs-presto and flink-s3-fs-hadoop) support entropy > injection.Entropy injection is a technique to improve the scalability of > AWS S3 buckets through adding some random characters near the beginning of > the key. > ci.apache.org > > [2] > https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/inde
flink1.12 请教下如何配置多hadoop参数,s3使用问题
hi, 环境: 1. flink-1.12,版本可以升级 2. flink-conf中配置了env.hadoop.conf.dir,路径下有hdfs集群的core-site.xml和hdfs-site.xml, state.backend保存在该HDFS上 3. flink的部署模式是K8S+session 需求: 需要从一个s3协议的分布式文件系统中读取文件,处理完写到mysql中 问题: s3配置采用hadoop的配置方式,保存为一个新的core-site.xml文件,参考的 https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A 按照官网说明文档中,需要 修改hadoop的环境变量,但是就和以前的core-site.xml冲突了,无法同时配置2个hadoop路径 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/ 或者 在flink-conf.yaml中添加一堆s3配置,这样又写死了,再新增一个s3集群的时候如何处理? 所以请教下如何解决这类问题(可以修改代码)?如何配置多个hadoop配置(比如从第一个文件系统(s3协议)读数据,写到第二个文件系统中(s3协议))?
Replacing S3 Client in Hadoop plugin
Hey Martijn, sorry for late respond. We wanted to replace the default client with our custom S3 client and not use the AmazonS3Client provided by the plugin. We used Flink-s3-fs-hadoop v1.12.2 and for our needs we had to upgrade to v1.14.0 [1]. AmazonS3 client factory is initialized[2] - if the property "fs.s3a.s3.client.factory.impl" [3] is not provided the default factory is created [4] which provides AmazonS3Client - which does not support what we need. I know that both the property and the factory interface are annotated with @InterfaceAudience.Private @InterfaceStability.Unstable from very early version. but we found this solution cleaner than extend the whole class and override the #setAmazonS3Client method. Bottom line, all we had to do was to create our own implementation for S3ClientFactory interface [5] and add to flink-conf.yaml : s3.s3.client.factory.impl: . place both the plugin and our artifact(with Factory and client impl) under ${FLINK_HOME}/plugins/s3 One important note: Flink-s3-fs-hadoop plugin includes the whole com.amazonaws.s3 source code, to avoid plugin class loader issues, we needed to remove the aws-s3-java-sdk dependency and provide the plugin dependency with scope "provided". If the jobs needs to do some work with S3,then shading com.amazonaws was also necessary. [1] https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop/1.14.0 [2] https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L264-L266 [3] https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java#L366-L369 [4] https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java#L66 [5] https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java Best, Tamir. From: Martijn Visser Sent: Wednesday, October 13, 2021 8:28 PM To: Tamir Sagi ; user@flink.apache.org Subject: Re: Replacing S3 Client in Hadoop plugin EXTERNAL EMAIL Hi, Could you elaborate on why you would like to replace the S3 client? Best regards, Martijn On Wed, 13 Oct 2021 at 17:18, Tamir Sagi mailto:tamir.s...@niceactimize.com>> wrote: I found the dependency org.apache.hadoop hadoop-aws 3.3.1 apparently its possible, there is a method setAmazonS3Client I think I found the solution. Thanks. Tamir. From: Tamir Sagi mailto:tamir.s...@niceactimize.com>> Sent: Wednesday, October 13, 2021 5:44 PM To: user@flink.apache.org<mailto:user@flink.apache.org> mailto:user@flink.apache.org>> Subject: Replacing S3 Client in Hadoop plugin Hey community. I would like to know if there is any way to replace the S3 client in Hadoop plugin[1] to a custom client(AmazonS3). I did notice that Hadoop plugin supports replacing the implementation of S3AFileSystem using "fs.s3a.impl" (in flink-conf.yaml it will be "s3.impl") but not the client itself [2] fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem The implementation class of the S3A Filesystem I delved into Hadoop plugin source code [3] , the Client itself is of type AmazonS3Client and cannot be replaced (for example) with a client of type AmazonS3EncryptionV2. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins Amazon S3 | Apache Flink<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins> Entropy injection for S3 file systems # The bundled S3 file systems (flink-s3-fs-presto and flink-s3-fs-hadoop) support entropy injection.Entropy injection is a technique to improve the scalability of AWS S3 buckets through adding some random characters near the beginning of the key. ci.apache.org<http://ci.apache.org> [2] https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A> [3] https://github.com/apache/hadoop/blob/master/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java Hadoop-AWS module: Integration with Amazon Web Services - Apache Hadoop<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=property%3E%0A%0A%3Cproperty%3E%0A%20%20%3Cname%3E-,fs.s3a.impl,-%3C/name%3E%0A%20%20%3Cvalue%3Eorg> Overview. Apache Hadoop’s hadoop-aws module provides support for AWS integration. applications to easily use this support.. To include the S3A client in Apache Hadoop’s default classpath: Make sure thatHADOOP_OPTIONAL_TOOLS in hadoop-env.sh includes hadoop-aws in its list of
Re: Replacing S3 Client in Hadoop plugin
Hi, Could you elaborate on why you would like to replace the S3 client? Best regards, Martijn On Wed, 13 Oct 2021 at 17:18, Tamir Sagi wrote: > I found the dependency > > > org.apache.hadoop > hadoop-aws > 3.3.1 > > > apparently its possible, there is a method > setAmazonS3Client > > I think I found the solution. > > Thanks. > > Tamir. > > -- > *From:* Tamir Sagi > *Sent:* Wednesday, October 13, 2021 5:44 PM > *To:* user@flink.apache.org > *Subject:* Replacing S3 Client in Hadoop plugin > > Hey community. > > I would like to know if there is any way to replace the S3 client in > Hadoop plugin[1] to a custom client(AmazonS3). > > I did notice that Hadoop plugin supports replacing the implementation of > S3AFileSystem using > "fs.s3a.impl" (in flink-conf.yaml it will be "s3.impl") but not the client > itself [2] > > > fs.s3a.impl > org.apache.hadoop.fs.s3a.S3AFileSystem > The implementation class of the S3A Filesystem > > > I delved into Hadoop plugin source code [3] , the Client itself is of type > AmazonS3Client and cannot be replaced (for example) with a client of > type AmazonS3EncryptionV2. > > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins > Amazon S3 | Apache Flink > <https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins> > Entropy injection for S3 file systems # The bundled S3 file systems > (flink-s3-fs-presto and flink-s3-fs-hadoop) support entropy > injection.Entropy injection is a technique to improve the scalability of > AWS S3 buckets through adding some random characters near the beginning of > the key. > ci.apache.org > > [2] > https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html > <https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A> > [3] > https://github.com/apache/hadoop/blob/master/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java > > Hadoop-AWS module: Integration with Amazon Web Services - Apache Hadoop > <https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=property%3E%0A%0A%3Cproperty%3E%0A%20%20%3Cname%3E-,fs.s3a.impl,-%3C/name%3E%0A%20%20%3Cvalue%3Eorg> > Overview. Apache Hadoop’s hadoop-aws module provides support for AWS > integration. applications to easily use this support.. To include the S3A > client in Apache Hadoop’s default classpath: Make sure > thatHADOOP_OPTIONAL_TOOLS in hadoop-env.sh includes hadoop-aws in its list > of optional modules to add in the classpath.. For client side interaction, > you can declare that relevant JARs must be ... > hadoop.apache.org > Thank you, > > Best, > Tamir. > > > Confidentiality: This communication and any attachments are intended for > the above-named persons only and may be confidential and/or legally > privileged. Any opinions expressed in this communication are not > necessarily those of NICE Actimize. If this communication has come to you > in error you must take no action based on it, nor must you copy or show it > to anyone; please delete/destroy and inform the sender by e-mail > immediately. > Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. > Viruses: Although we have taken steps toward ensuring that this e-mail and > attachments are free from any virus, we advise that in keeping with good > computing practice the recipient should ensure they are actually virus free. > > > Confidentiality: This communication and any attachments are intended for > the above-named persons only and may be confidential and/or legally > privileged. Any opinions expressed in this communication are not > necessarily those of NICE Actimize. If this communication has come to you > in error you must take no action based on it, nor must you copy or show it > to anyone; please delete/destroy and inform the sender by e-mail > immediately. > Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. > Viruses: Although we have taken steps toward ensuring that this e-mail and > attachments are free from any virus, we advise that in keeping with good > computing practice the recipient should ensure they are actually virus free. >
Re: Replacing S3 Client in Hadoop plugin
I found the dependency org.apache.hadoop hadoop-aws 3.3.1 apparently its possible, there is a method setAmazonS3Client I think I found the solution. Thanks. Tamir. From: Tamir Sagi Sent: Wednesday, October 13, 2021 5:44 PM To: user@flink.apache.org Subject: Replacing S3 Client in Hadoop plugin Hey community. I would like to know if there is any way to replace the S3 client in Hadoop plugin[1] to a custom client(AmazonS3). I did notice that Hadoop plugin supports replacing the implementation of S3AFileSystem using "fs.s3a.impl" (in flink-conf.yaml it will be "s3.impl") but not the client itself [2] fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem The implementation class of the S3A Filesystem I delved into Hadoop plugin source code [3] , the Client itself is of type AmazonS3Client and cannot be replaced (for example) with a client of type AmazonS3EncryptionV2. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins Amazon S3 | Apache Flink<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins> Entropy injection for S3 file systems # The bundled S3 file systems (flink-s3-fs-presto and flink-s3-fs-hadoop) support entropy injection.Entropy injection is a technique to improve the scalability of AWS S3 buckets through adding some random characters near the beginning of the key. ci.apache.org [2] https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A> [3] https://github.com/apache/hadoop/blob/master/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java Hadoop-AWS module: Integration with Amazon Web Services - Apache Hadoop<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=property%3E%0A%0A%3Cproperty%3E%0A%20%20%3Cname%3E-,fs.s3a.impl,-%3C/name%3E%0A%20%20%3Cvalue%3Eorg> Overview. Apache Hadoop’s hadoop-aws module provides support for AWS integration. applications to easily use this support.. To include the S3A client in Apache Hadoop’s default classpath: Make sure thatHADOOP_OPTIONAL_TOOLS in hadoop-env.sh includes hadoop-aws in its list of optional modules to add in the classpath.. For client side interaction, you can declare that relevant JARs must be ... hadoop.apache.org Thank you, Best, Tamir. Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free. Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.
Replacing S3 Client in Hadoop plugin
Hey community. I would like to know if there is any way to replace the S3 client in Hadoop plugin[1] to a custom client(AmazonS3). I did notice that Hadoop plugin supports replacing the implementation of S3AFileSystem using "fs.s3a.impl" (in flink-conf.yaml it will be "s3.impl") but not the client itself [2] fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem The implementation class of the S3A Filesystem I delved into Hadoop plugin source code [3] , the Client itself is of type AmazonS3Client and cannot be replaced (for example) with a client of type AmazonS3EncryptionV2. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins Amazon S3 | Apache Flink<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins> Entropy injection for S3 file systems # The bundled S3 file systems (flink-s3-fs-presto and flink-s3-fs-hadoop) support entropy injection.Entropy injection is a technique to improve the scalability of AWS S3 buckets through adding some random characters near the beginning of the key. ci.apache.org [2] https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A> [3] https://github.com/apache/hadoop/blob/master/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java Hadoop-AWS module: Integration with Amazon Web Services - Apache Hadoop<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=property%3E%0A%0A%3Cproperty%3E%0A%20%20%3Cname%3E-,fs.s3a.impl,-%3C/name%3E%0A%20%20%3Cvalue%3Eorg> Overview. Apache Hadoop’s hadoop-aws module provides support for AWS integration. applications to easily use this support.. To include the S3A client in Apache Hadoop’s default classpath: Make sure thatHADOOP_OPTIONAL_TOOLS in hadoop-env.sh includes hadoop-aws in its list of optional modules to add in the classpath.. For client side interaction, you can declare that relevant JARs must be ... hadoop.apache.org Thank you, Best, Tamir. Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.
Re: Any one can help me? How to connect offline hadoop cluster and realtime hadoop cluster by different hive catalog?
Hi! It seems that your Flink cluster cannot connect to realtime-cluster-master001/xx.xx.xx.xx:8050. Please check your network and port status. Jim Chen 于2021年8月27日周五 下午2:20写道: > Hi, All > My flink version is 1.13.1 and my company have two hadoop cluster, > offline hadoop cluster and realtime hadoop cluster. Now, on realtime hadoop > cluster, we want to submit flink job to connect offline hadoop cluster by > different hive catalog. I use different hive configuration diretory in hive > catalog configuration. The error log of flink job as follow: > > 2021-08-27 13:50:22,902 INFO org.apache.hadoop.ipc.Client > [] - Retrying connect to server: > realtime-cluster-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry > policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50, sleepTime=1000 > MILLISECONDS)。 > > Any one can help me? Thanks! >
Any one can help me? How to connect offline hadoop cluster and realtime hadoop cluster by different hive catalog?
Hi, All My flink version is 1.13.1 and my company have two hadoop cluster, offline hadoop cluster and realtime hadoop cluster. Now, on realtime hadoop cluster, we want to submit flink job to connect offline hadoop cluster by different hive catalog. I use different hive configuration diretory in hive catalog configuration. The error log of flink job as follow: 2021-08-27 13:50:22,902 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: realtime-cluster-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50, sleepTime=1000 MILLISECONDS)。 Any one can help me? Thanks!
Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
Could this be related to https://issues.apache.org/jira/browse/FLINK-22414? On Thu, Jul 22, 2021 at 3:53 PM Timo Walther wrote: > Thanks, this should definitely work with the pre-packaged connectors of > Ververica platform. > > I guess we have to investigate what is going on. Until then, a > workaround could be to add Hadoop manually and set the HADOOP_CLASSPATH > environment variable. The root cause seems that Hadoop cannot be found. > > Alternatively, you could also build a custom image and include Hadoop in > the lib folder of Flink: > > https://docs.ververica.com/v1.3/platform/installation/custom_images.html > > I hope this helps. I will get back to you if we have a fix ready. > > Regards, > Timo > > > > On 22.07.21 14:30, Natu Lauchande wrote: > > Sure. > > > > That's how the ddl table looks like: > > > > CREATETABLEtablea ( > > > > `a` BIGINT, > > > > `b` BIGINT, > > > > `c` BIGINT > > > > ) > > > > COMMENT '' > > > > WITH( > > > > 'auto-compaction'='false', > > > > 'connector'='filesystem', > > > > 'format'='parquet', > > > > 'parquet.block.size'='134217728', > > > > 'parquet.compression'='SNAPPY', > > > > 'parquet.dictionary.page.size'='1048576', > > > > 'parquet.enable.dictionary'='true', > > > > 'parquet.page.size'='1048576', > > > > 'parquet.writer.max-padding'='2097152', > > > > 'path'='s3a://test/test’, > > > > 'sink.partition-commit.delay'='1 h', > > > > 'sink.partition-commit.policy.kind'='success-file', > > > > 'sink.partition-commit.success-file.name > > <http://sink.partition-commit.success-file.name>'='_SUCCESS', > > > > 'sink.partition-commit.trigger'='process-time', > > > > 'sink.rolling-policy.check-interval'='20 min', > > > > 'sink.rolling-policy.file-size'='128MB', > > > > 'sink.rolling-policy.rollover-interval'='2 h' > > > > ); > > > > > > > > When a change the connector to a blackhole it immediately works without > > errors. I have the redacted the names and paths. > > > > > > > > Thanks, > > Natu > > > > > > On Thu, Jul 22, 2021 at 2:24 PM Timo Walther > <mailto:twal...@apache.org>> wrote: > > > > Maybe you can share also which connector/format you are using? What > is > > the DDL? > > > > Regards, > > Timo > > > > > > On 22.07.21 14:11, Natu Lauchande wrote: > > > Hey Timo, > > > > > > Thanks for the reply. > > > > > > No custom file as we are using Flink SQL and submitting the job > > directly > > > through the SQL Editor UI. We are using Flink 1.13.1 as the > > supported > > > flink version. No custom code all through Flink SQL on UI no jars. > > > > > > Thanks, > > > Natu > > > > > > On Thu, Jul 22, 2021 at 2:08 PM Timo Walther > <mailto:twal...@apache.org> > > > <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote: > > > > > > Hi Natu, > > > > > > Ververica Platform 2.5 has updated the bundled Hadoop version > > but this > > > should not result in a NoClassDefFoundError exception. How > > are you > > > submitting your SQL jobs? You don't use Ververica's SQL > > service but > > > have > > > built a regular JAR file, right? If this is the case, can you > > share > > > your > > > pom.xml file with us? The Flink version stays constant at > 1.12? > > > > > > Regards, > > > Timo > > > > > > On 22.07.21 12:22, Natu Lauchande wrote: > > > > Good day Flink community, > > > > > > > > Apache Flink/Ververica Community Edition - Question > > > > > > > > > > > > I am having an issue with my Flink SQL jobs since updating > > > from Flink > > > > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs > > running on > > > > parquet and S3 i am getting the following error > continuously: > > > > > > > > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @ > >
Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
Thanks, this should definitely work with the pre-packaged connectors of Ververica platform. I guess we have to investigate what is going on. Until then, a workaround could be to add Hadoop manually and set the HADOOP_CLASSPATH environment variable. The root cause seems that Hadoop cannot be found. Alternatively, you could also build a custom image and include Hadoop in the lib folder of Flink: https://docs.ververica.com/v1.3/platform/installation/custom_images.html I hope this helps. I will get back to you if we have a fix ready. Regards, Timo On 22.07.21 14:30, Natu Lauchande wrote: Sure. That's how the ddl table looks like: CREATETABLEtablea ( `a` BIGINT, `b` BIGINT, `c` BIGINT ) COMMENT '' WITH( 'auto-compaction'='false', 'connector'='filesystem', 'format'='parquet', 'parquet.block.size'='134217728', 'parquet.compression'='SNAPPY', 'parquet.dictionary.page.size'='1048576', 'parquet.enable.dictionary'='true', 'parquet.page.size'='1048576', 'parquet.writer.max-padding'='2097152', 'path'='s3a://test/test’, 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='success-file', 'sink.partition-commit.success-file.name <http://sink.partition-commit.success-file.name>'='_SUCCESS', 'sink.partition-commit.trigger'='process-time', 'sink.rolling-policy.check-interval'='20 min', 'sink.rolling-policy.file-size'='128MB', 'sink.rolling-policy.rollover-interval'='2 h' ); When a change the connector to a blackhole it immediately works without errors. I have the redacted the names and paths. Thanks, Natu On Thu, Jul 22, 2021 at 2:24 PM Timo Walther <mailto:twal...@apache.org>> wrote: Maybe you can share also which connector/format you are using? What is the DDL? Regards, Timo On 22.07.21 14:11, Natu Lauchande wrote: > Hey Timo, > > Thanks for the reply. > > No custom file as we are using Flink SQL and submitting the job directly > through the SQL Editor UI. We are using Flink 1.13.1 as the supported > flink version. No custom code all through Flink SQL on UI no jars. > > Thanks, > Natu > > On Thu, Jul 22, 2021 at 2:08 PM Timo Walther mailto:twal...@apache.org> > <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote: > > Hi Natu, > > Ververica Platform 2.5 has updated the bundled Hadoop version but this > should not result in a NoClassDefFoundError exception. How are you > submitting your SQL jobs? You don't use Ververica's SQL service but > have > built a regular JAR file, right? If this is the case, can you share > your > pom.xml file with us? The Flink version stays constant at 1.12? > > Regards, > Timo > > On 22.07.21 12:22, Natu Lauchande wrote: > > Good day Flink community, > > > > Apache Flink/Ververica Community Edition - Question > > > > > > I am having an issue with my Flink SQL jobs since updating > from Flink > > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on > > parquet and S3 i am getting the following error continuously: > > > > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @ > > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309). > > > > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration > > > > at java.lang.Class.getDeclaredConstructors0(Native Method) > ~[?:1.8.0_292] > > > > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) > > ~[?:1.8.0_292] > > > > at java.lang.Class.getDeclaredConstructors(Class.java:2020) > ~[?:1.8.0_292] > > > > ** > > > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) > > ~[?:1.8.0_292] > > > > at > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.util.Instan
Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
Sure. That's how the ddl table looks like: CREATE TABLE tablea ( `a` BIGINT, `b` BIGINT, `c` BIGINT ) COMMENT '' WITH ( 'auto-compaction' = 'false', 'connector' = 'filesystem', 'format' = 'parquet', 'parquet.block.size' = '134217728', 'parquet.compression' = 'SNAPPY', 'parquet.dictionary.page.size' = '1048576', 'parquet.enable.dictionary' = 'true', 'parquet.page.size' = '1048576', 'parquet.writer.max-padding' = '2097152', 'path' = 's3a://test/test’, 'sink.partition-commit.delay' = '1 h', 'sink.partition-commit.policy.kind' = 'success-file', 'sink.partition-commit.success-file.name' = '_SUCCESS', 'sink.partition-commit.trigger' = 'process-time', 'sink.rolling-policy.check-interval' = '20 min', 'sink.rolling-policy.file-size' = '128MB', 'sink.rolling-policy.rollover-interval' = '2 h' ); When a change the connector to a blackhole it immediately works without errors. I have the redacted the names and paths. Thanks, Natu On Thu, Jul 22, 2021 at 2:24 PM Timo Walther wrote: > Maybe you can share also which connector/format you are using? What is > the DDL? > > Regards, > Timo > > > On 22.07.21 14:11, Natu Lauchande wrote: > > Hey Timo, > > > > Thanks for the reply. > > > > No custom file as we are using Flink SQL and submitting the job directly > > through the SQL Editor UI. We are using Flink 1.13.1 as the supported > > flink version. No custom code all through Flink SQL on UI no jars. > > > > Thanks, > > Natu > > > > On Thu, Jul 22, 2021 at 2:08 PM Timo Walther > <mailto:twal...@apache.org>> wrote: > > > > Hi Natu, > > > > Ververica Platform 2.5 has updated the bundled Hadoop version but > this > > should not result in a NoClassDefFoundError exception. How are you > > submitting your SQL jobs? You don't use Ververica's SQL service but > > have > > built a regular JAR file, right? If this is the case, can you share > > your > > pom.xml file with us? The Flink version stays constant at 1.12? > > > > Regards, > > Timo > > > > On 22.07.21 12:22, Natu Lauchande wrote: > > > Good day Flink community, > > > > > > Apache Flink/Ververica Community Edition - Question > > > > > > > > > I am having an issue with my Flink SQL jobs since updating > > from Flink > > > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on > > > parquet and S3 i am getting the following error continuously: > > > > > > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @ > > > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local > (dataPort=39309). > > > > > > java.lang.NoClassDefFoundError: > org/apache/hadoop/conf/Configuration > > > > > > at java.lang.Class.getDeclaredConstructors0(Native Method) > > ~[?:1.8.0_292] > > > > > > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) > > > ~[?:1.8.0_292] > > > > > > at java.lang.Class.getDeclaredConstructors(Class.java:2020) > > ~[?:1.8.0_292] > > > > > > ** > > > > > > at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) > > > ~[?:1.8.0_292] > > > > > > at > > > > > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) > > > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > > > at > > > > > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) > > > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > > > at > > > > > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) > > > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > > > at > > > > > > > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) > > > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > > > at > > > > > > > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) > > > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > > &
Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
Maybe you can share also which connector/format you are using? What is the DDL? Regards, Timo On 22.07.21 14:11, Natu Lauchande wrote: Hey Timo, Thanks for the reply. No custom file as we are using Flink SQL and submitting the job directly through the SQL Editor UI. We are using Flink 1.13.1 as the supported flink version. No custom code all through Flink SQL on UI no jars. Thanks, Natu On Thu, Jul 22, 2021 at 2:08 PM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Natu, Ververica Platform 2.5 has updated the bundled Hadoop version but this should not result in a NoClassDefFoundError exception. How are you submitting your SQL jobs? You don't use Ververica's SQL service but have built a regular JAR file, right? If this is the case, can you share your pom.xml file with us? The Flink version stays constant at 1.12? Regards, Timo On 22.07.21 12:22, Natu Lauchande wrote: > Good day Flink community, > > Apache Flink/Ververica Community Edition - Question > > > I am having an issue with my Flink SQL jobs since updating from Flink > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on > parquet and S3 i am getting the following error continuously: > > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @ > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309). > > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration > > at java.lang.Class.getDeclaredConstructors0(Native Method) ~[?:1.8.0_292] > > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) > ~[?:1.8.0_292] > > at java.lang.Class.getDeclaredConstructors(Class.java:2020) ~[?:1.8.0_292] > > ** > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) > ~[?:1.8.0_292] > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181) > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > &
Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
Hey Timo, Thanks for the reply. No custom file as we are using Flink SQL and submitting the job directly through the SQL Editor UI. We are using Flink 1.13.1 as the supported flink version. No custom code all through Flink SQL on UI no jars. Thanks, Natu On Thu, Jul 22, 2021 at 2:08 PM Timo Walther wrote: > Hi Natu, > > Ververica Platform 2.5 has updated the bundled Hadoop version but this > should not result in a NoClassDefFoundError exception. How are you > submitting your SQL jobs? You don't use Ververica's SQL service but have > built a regular JAR file, right? If this is the case, can you share your > pom.xml file with us? The Flink version stays constant at 1.12? > > Regards, > Timo > > On 22.07.21 12:22, Natu Lauchande wrote: > > Good day Flink community, > > > > Apache Flink/Ververica Community Edition - Question > > > > > > I am having an issue with my Flink SQL jobs since updating from Flink > > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on > > parquet and S3 i am getting the following error continuously: > > > > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @ > > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309). > > > > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration > > > > at java.lang.Class.getDeclaredConstructors0(Native Method) ~[?:1.8.0_292] > > > > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) > > ~[?:1.8.0_292] > > > > at java.lang.Class.getDeclaredConstructors(Class.java:2020) > ~[?:1.8.0_292] > > > > ** > > > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) > > ~[?:1.8.0_292] > > > > at > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181) > > > ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] > > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTas
Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
Hi Natu, Ververica Platform 2.5 has updated the bundled Hadoop version but this should not result in a NoClassDefFoundError exception. How are you submitting your SQL jobs? You don't use Ververica's SQL service but have built a regular JAR file, right? If this is the case, can you share your pom.xml file with us? The Flink version stays constant at 1.12? Regards, Timo On 22.07.21 12:22, Natu Lauchande wrote: Good day Flink community, Apache Flink/Ververica Community Edition - Question I am having an issue with my Flink SQL jobs since updating from Flink 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on parquet and S3 i am getting the following error continuously: INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @ 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309). java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at java.lang.Class.getDeclaredConstructors0(Native Method) ~[?:1.8.0_292] at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) ~[?:1.8.0_292] at java.lang.Class.getDeclaredConstructors(Class.java:2020) ~[?:1.8.0_292] ** at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) ~[?:1.8.0_292] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_292] at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_292] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink
Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
Good day Flink community, Apache Flink/Ververica Community Edition - Question I am having an issue with my Flink SQL jobs since updating from Flink 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs running on parquet and S3 i am getting the following error continuously: INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @ 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local (dataPort=39309). java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at java.lang.Class.getDeclaredConstructors0(Native Method) ~[?:1.8.0_292] at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) ~[?:1.8.0_292] at java.lang.Class.getDeclaredConstructors(Class.java:2020) ~[?:1.8.0_292] ** at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) ~[?:1.8.0_292] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_292] at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_292] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1[] at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_292] ... 57 more 2021-07-22 09:38:43,095 DEBUG org.apache.flink.runtime.scheduler.SharedSlot [] - Remove logical slot (SlotRequestId{4297879e795d0516e36a7c26ccc795b2}) for execution vertex (id cbc357ccb763df2852fee8c4fc7d55f2_0) from
Re: Failure running Flink locally with flink-s3-fs-hadoop + AWS SDK v2 as a dependency
Hi, sorry for resurrecting the old thread, but I have precisely the same issue with a different filesystem. I've tried using plugins dir, setting FLINK_PLUGINS_DIR, etc. - nothing works locally. I added a breakpoint to the PluginConfig.getPluginsDir method and confirmed it's not even called. Could this be a LocalStreamEnvironment limitation? Is there any way to enable plugin loading locally? Thanks! On 2021/06/21 11:13:29, Yuval Itzchakov wrote: > Currently I have the s3-hadoop dependency in my build.sbt. > > I guess I need to move it to the PLUGIN directory locally as well, didn't > think of that. > > On Mon, Jun 21, 2021, 13:35 Dawid Wysakowicz wrote: > > > Hi, > > > > Where do you use the AWS SDK v2? Filesystems should use the plugins > > subsystem[1]. Therefore when running from within the IDE make sure you have > > the s3 plugin in ${WORKING_DIR}/plugins/s3 and put the flink-s3-fs-hadoop > > dependency there. Or specify a different directory for plugins via the ENV > > variable: FLINK_PLUGINS_DIR. > > > > If you want to use the AWS SDK v2 for the s3 filesystem I guess you need > > to build the jar yourself. > > > > Best, > > > > Dawid > > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/ > > > > > > On 20/06/2021 15:00, Yuval Itzchakov wrote: > > > > Hi, > > > > I'm trying to run Flink with AWS SDK v2 and restore a Flink savepoint > > that's stored in S3. > > > > When adding the dependency on flink-s3-fs-hadoop locally, I fail to run > > Flink at runtime with the following error: > > > > Caused by: java.lang.invoke.LambdaConversionException: Invalid receiver > > type interface org.apache.http.Header; not a subtype of implementation type > > interface org.apache.http.NameValuePair > > > > [image: image.png] > > > > This happens because the flink-s3 JAR is a fat JAR containing an old > > version of org.apache.http.HttpMessage which does not implement the > > NameValuePair interface (there was a non backwards compatible change in the > > library, see https://github.com/aws/aws-sdk-java-v2/issues/652) > > > > [image: image.png] > > > > Since the library itself isn't shaded, it gets loaded first overriding the > > HttpClient newer version I have on the classpath and blows up when > > bootstrapping Flink. > > > > Any advice on how to work around this? > > -- > > Best Regards, > > Yuval Itzchakov. > > > > >
Re: Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop
你好,我也遇到了这个问题,请问下你具体是如何打镜像的呢? 我是Dockerfile里添加 COPY --chown=flink:flink jars/flink-shaded-hadoop-2-2.8.3-10.0.jar $FLINK_HOME/lib/ 但是运行flink 1.11 on k8s的session cluster, jobserver能启动,但是提交job后报错。说不能初始化HadoopUtils, 但是flink-shaded-hadoop-2-2.8.3-10.0.jar里的确是有HadoopUtils这个class的。 Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.runtime.util.HadoopUtils <http://apache-flink.147419.n8.nabble.com/file/t1510/WX20210522-115421%402x.png> -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink Hive connector: hive-conf-dir supports hdfs URI, while hadoop-conf-dir supports local path only?
Hi community, This question is cross-posted on Stack Overflow https://stackoverflow.com/questions/67264156/flink-hive-connector-hive-conf-dir-supports-hdfs-uri-while-hadoop-conf-dir-sup In my current setup, local dev env can access testing env. I would like to run Flink job on local dev env, while reading/writing data from/to testing env Hive. This is what I do: ``` CREATE CATALOG hive WITH ( 'type' = 'hive', 'hive-conf-dir' = 'hdfs://testhdp273/hive/conf' ) ``` However, I realizes I also need to specify a matching Hadoop classpath, therefore I want to also define `hadoop-conf-dir` that actually points to the hadoop classpath in testing env. However, as said in [docs]( https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/#hadoop-conf-dir ): > Path to Hadoop conf dir. Only local file system paths are supported. The recommended way to set Hadoop conf is via the HADOOP_CONF_DIR environment variable. Use the option only if environment variable doesn't work for you, e.g. if you want to configure each HiveCatalog separately. I wonder why hadoop-conf-dir only supports local path, while hive-conf-dir supports any legit hdfs path? Any work around to this problem? Any help? Thanks! Best, Yik San
Re: Flink Hadoop config on docker-compose
I think you're right, Flavio. I created FLINK-22414 to cover this. Thanks for bringing it up. Matthias [1] https://issues.apache.org/jira/browse/FLINK-22414 On Fri, Apr 16, 2021 at 9:32 AM Flavio Pompermaier wrote: > Hi Yang, > isn't this something to fix? If I look at the documentation at [1], in > the "Passing configuration via environment variables" section, there is: > > "The environment variable FLINK_PROPERTIES should contain a list of Flink > cluster configuration options separated by new line, > the same way as in the flink-conf.yaml. FLINK_PROPERTIES takes precedence > over configurations in flink-conf.yaml." > > To me this means that if I specify "env.hadoop.conf.dir" it should be > handled as well. Am I wrong? > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html > > Best, > Flavio > > On Fri, Apr 16, 2021 at 4:52 AM Yang Wang wrote: > >> It seems that we do not export HADOOP_CONF_DIR as environment variables >> in current implementation, even though we have set the env.xxx flink config >> options. It is only used to construct the classpath for the JM/TM process. >> However, in "HadoopUtils"[2] we do not support getting the hadoop >> configuration from classpath. >> >> >> [1]. >> https://github.com/apache/flink/blob/release-1.11/flink-dist/src/main/flink-bin/bin/config.sh#L256 >> [2]. >> https://github.com/apache/flink/blob/release-1.11/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java#L64 >> >> >> Best, >> Yang >> >> Best, >> Yang >> >> Flavio Pompermaier 于2021年4月16日周五 上午3:55写道: >> >>> Hi Robert, >>> indeed my docker-compose does work only if I add also Hadoop and yarn >>> home while I was expecting that those two variables were generated >>> automatically just setting env.xxx variables in FLINK_PROPERTIES variable.. >>> >>> I just want to understand what to expect, if I really need to specify >>> Hadoop and yarn home as env variables or not >>> >>> Il gio 15 apr 2021, 20:39 Robert Metzger ha >>> scritto: >>> >>>> Hi, >>>> >>>> I'm not aware of any known issues with Hadoop and Flink on Docker. >>>> >>>> I also tried what you are doing locally, and it seems to work: >>>> >>>> flink-jobmanager| 2021-04-15 18:37:48,300 INFO >>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting >>>> StandaloneSessionClusterEntrypoint. >>>> flink-jobmanager| 2021-04-15 18:37:48,338 INFO >>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install >>>> default filesystem. >>>> flink-jobmanager| 2021-04-15 18:37:48,375 INFO >>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install >>>> security context. >>>> flink-jobmanager| 2021-04-15 18:37:48,404 INFO >>>> org.apache.flink.runtime.security.modules.HadoopModule [] - Hadoop >>>> user set to flink (auth:SIMPLE) >>>> flink-jobmanager| 2021-04-15 18:37:48,408 INFO >>>> org.apache.flink.runtime.security.modules.JaasModule [] - Jaas >>>> file will be created as /tmp/jaas-811306162058602256.conf. >>>> flink-jobmanager| 2021-04-15 18:37:48,415 INFO >>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - >>>> Initializing cluster services. >>>> >>>> Here's my code: >>>> >>>> https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39 >>>> >>>> Hope this helps! >>>> >>>> On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Hi everybody, >>>>> I'm trying to set up reading from HDFS using docker-compose and Flink >>>>> 1.11.3. >>>>> If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir' >>>>> using FLINK_PROPERTIES (under environment section of the docker-compose >>>>> service) I see in the logs the following line: >>>>> >>>>> "Could not find Hadoop configuration via any of the supported method" >>>>> >>>>> If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not >>>>> generated by the run scripts. >>>>> I
Re: Flink Hadoop config on docker-compose
Great! Thanks for the support On Thu, Apr 22, 2021 at 2:57 PM Matthias Pohl wrote: > I think you're right, Flavio. I created FLINK-22414 to cover this. Thanks > for bringing it up. > > Matthias > > [1] https://issues.apache.org/jira/browse/FLINK-22414 > > On Fri, Apr 16, 2021 at 9:32 AM Flavio Pompermaier > wrote: > >> Hi Yang, >> isn't this something to fix? If I look at the documentation at [1], in >> the "Passing configuration via environment variables" section, there is: >> >> "The environment variable FLINK_PROPERTIES should contain a list of Flink >> cluster configuration options separated by new line, >> the same way as in the flink-conf.yaml. FLINK_PROPERTIES takes precedence >> over configurations in flink-conf.yaml." >> >> To me this means that if I specify "env.hadoop.conf.dir" it should be >> handled as well. Am I wrong? >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html >> >> Best, >> Flavio >> >> On Fri, Apr 16, 2021 at 4:52 AM Yang Wang wrote: >> >>> It seems that we do not export HADOOP_CONF_DIR as environment variables >>> in current implementation, even though we have set the env.xxx flink config >>> options. It is only used to construct the classpath for the JM/TM process. >>> However, in "HadoopUtils"[2] we do not support getting the hadoop >>> configuration from classpath. >>> >>> >>> [1]. >>> https://github.com/apache/flink/blob/release-1.11/flink-dist/src/main/flink-bin/bin/config.sh#L256 >>> [2]. >>> https://github.com/apache/flink/blob/release-1.11/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java#L64 >>> >>> >>> Best, >>> Yang >>> >>> Best, >>> Yang >>> >>> Flavio Pompermaier 于2021年4月16日周五 上午3:55写道: >>> >>>> Hi Robert, >>>> indeed my docker-compose does work only if I add also Hadoop and yarn >>>> home while I was expecting that those two variables were generated >>>> automatically just setting env.xxx variables in FLINK_PROPERTIES variable.. >>>> >>>> I just want to understand what to expect, if I really need to specify >>>> Hadoop and yarn home as env variables or not >>>> >>>> Il gio 15 apr 2021, 20:39 Robert Metzger ha >>>> scritto: >>>> >>>>> Hi, >>>>> >>>>> I'm not aware of any known issues with Hadoop and Flink on Docker. >>>>> >>>>> I also tried what you are doing locally, and it seems to work: >>>>> >>>>> flink-jobmanager| 2021-04-15 18:37:48,300 INFO >>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - >>>>> Starting >>>>> StandaloneSessionClusterEntrypoint. >>>>> flink-jobmanager| 2021-04-15 18:37:48,338 INFO >>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install >>>>> default filesystem. >>>>> flink-jobmanager| 2021-04-15 18:37:48,375 INFO >>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install >>>>> security context. >>>>> flink-jobmanager| 2021-04-15 18:37:48,404 INFO >>>>> org.apache.flink.runtime.security.modules.HadoopModule [] - Hadoop >>>>> user set to flink (auth:SIMPLE) >>>>> flink-jobmanager| 2021-04-15 18:37:48,408 INFO >>>>> org.apache.flink.runtime.security.modules.JaasModule [] - Jaas >>>>> file will be created as /tmp/jaas-811306162058602256.conf. >>>>> flink-jobmanager| 2021-04-15 18:37:48,415 INFO >>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - >>>>> Initializing cluster services. >>>>> >>>>> Here's my code: >>>>> >>>>> https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39 >>>>> >>>>> Hope this helps! >>>>> >>>>> On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Hi everybody, >>>>>> I'm trying to set up reading from HDFS using docker-compose and Flink >>>>>> 1.11.3. >>>>>> If I pass 'env.hadoop.c
Re: Flink Hadoop config on docker-compose
It seems that we do not export HADOOP_CONF_DIR as environment variables in current implementation, even though we have set the env.xxx flink config options. It is only used to construct the classpath for the JM/TM process. However, in "HadoopUtils"[2] we do not support getting the hadoop configuration from classpath. [1]. https://github.com/apache/flink/blob/release-1.11/flink-dist/src/main/flink-bin/bin/config.sh#L256 [2]. https://github.com/apache/flink/blob/release-1.11/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java#L64 Best, Yang Best, Yang Flavio Pompermaier 于2021年4月16日周五 上午3:55写道: > Hi Robert, > indeed my docker-compose does work only if I add also Hadoop and yarn home > while I was expecting that those two variables were generated automatically > just setting env.xxx variables in FLINK_PROPERTIES variable.. > > I just want to understand what to expect, if I really need to specify > Hadoop and yarn home as env variables or not > > Il gio 15 apr 2021, 20:39 Robert Metzger ha scritto: > >> Hi, >> >> I'm not aware of any known issues with Hadoop and Flink on Docker. >> >> I also tried what you are doing locally, and it seems to work: >> >> flink-jobmanager| 2021-04-15 18:37:48,300 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting >> StandaloneSessionClusterEntrypoint. >> flink-jobmanager| 2021-04-15 18:37:48,338 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install >> default filesystem. >> flink-jobmanager| 2021-04-15 18:37:48,375 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install >> security context. >> flink-jobmanager| 2021-04-15 18:37:48,404 INFO >> org.apache.flink.runtime.security.modules.HadoopModule [] - Hadoop >> user set to flink (auth:SIMPLE) >> flink-jobmanager| 2021-04-15 18:37:48,408 INFO >> org.apache.flink.runtime.security.modules.JaasModule [] - Jaas >> file will be created as /tmp/jaas-811306162058602256.conf. >> flink-jobmanager| 2021-04-15 18:37:48,415 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - >> Initializing cluster services. >> >> Here's my code: >> >> https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39 >> >> Hope this helps! >> >> On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier >> wrote: >> >>> Hi everybody, >>> I'm trying to set up reading from HDFS using docker-compose and Flink >>> 1.11.3. >>> If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir' >>> using FLINK_PROPERTIES (under environment section of the docker-compose >>> service) I see in the logs the following line: >>> >>> "Could not find Hadoop configuration via any of the supported method" >>> >>> If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not >>> generated by the run scripts. >>> Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under >>> environment section of the docker-compose service) I don't see that line. >>> >>> Is this the expected behavior? >>> >>> Below the relevant docker-compose service I use (I've removed the >>> content of HADOOP_CLASSPATH content because is too long and I didn't report >>> the taskmanager that is similar): >>> >>> flink-jobmanager: >>> container_name: flink-jobmanager >>> build: >>> context: . >>> dockerfile: Dockerfile.flink >>> args: >>> FLINK_VERSION: 1.11.3-scala_2.12-java11 >>> image: 'flink-test:1.11.3-scala_2.12-java11' >>> ports: >>> - "8091:8081" >>> - "8092:8082" >>> command: jobmanager >>> environment: >>> - | >>> FLINK_PROPERTIES= >>> jobmanager.rpc.address: flink-jobmanager >>> rest.port: 8081 >>> historyserver.web.port: 8082 >>> web.upload.dir: /opt/flink >>> env.hadoop.conf.dir: /opt/hadoop/conf >>> env.yarn.conf.dir: /opt/hadoop/conf >>> - | >>> HADOOP_CLASSPATH=... >>> - HADOOP_CONF_DIR=/opt/hadoop/conf >>> - YARN_CONF_DIR=/opt/hadoop/conf >>> volumes: >>> - 'flink_shared_folder:/tmp/test' >>> - 'flink_uploads:/opt/flink/flink-web-upload' >>> - 'flink_hadoop_conf:/opt/hadoop/conf' >>> - 'flink_hadoop_libs:/opt/hadoop-3.2.1/share' >>> >>> >>> Thanks in advance for any support, >>> Flavio >>> >>
Re: Flink Hadoop config on docker-compose
Hi Robert, indeed my docker-compose does work only if I add also Hadoop and yarn home while I was expecting that those two variables were generated automatically just setting env.xxx variables in FLINK_PROPERTIES variable.. I just want to understand what to expect, if I really need to specify Hadoop and yarn home as env variables or not Il gio 15 apr 2021, 20:39 Robert Metzger ha scritto: > Hi, > > I'm not aware of any known issues with Hadoop and Flink on Docker. > > I also tried what you are doing locally, and it seems to work: > > flink-jobmanager| 2021-04-15 18:37:48,300 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting > StandaloneSessionClusterEntrypoint. > flink-jobmanager| 2021-04-15 18:37:48,338 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install > default filesystem. > flink-jobmanager| 2021-04-15 18:37:48,375 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install > security context. > flink-jobmanager| 2021-04-15 18:37:48,404 INFO > org.apache.flink.runtime.security.modules.HadoopModule [] - Hadoop > user set to flink (auth:SIMPLE) > flink-jobmanager| 2021-04-15 18:37:48,408 INFO > org.apache.flink.runtime.security.modules.JaasModule [] - Jaas > file will be created as /tmp/jaas-811306162058602256.conf. > flink-jobmanager| 2021-04-15 18:37:48,415 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - > Initializing cluster services. > > Here's my code: > > https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39 > > Hope this helps! > > On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier > wrote: > >> Hi everybody, >> I'm trying to set up reading from HDFS using docker-compose and Flink >> 1.11.3. >> If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir' >> using FLINK_PROPERTIES (under environment section of the docker-compose >> service) I see in the logs the following line: >> >> "Could not find Hadoop configuration via any of the supported method" >> >> If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not >> generated by the run scripts. >> Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under >> environment section of the docker-compose service) I don't see that line. >> >> Is this the expected behavior? >> >> Below the relevant docker-compose service I use (I've removed the content >> of HADOOP_CLASSPATH content because is too long and I didn't report the >> taskmanager that is similar): >> >> flink-jobmanager: >> container_name: flink-jobmanager >> build: >> context: . >> dockerfile: Dockerfile.flink >> args: >> FLINK_VERSION: 1.11.3-scala_2.12-java11 >> image: 'flink-test:1.11.3-scala_2.12-java11' >> ports: >> - "8091:8081" >> - "8092:8082" >> command: jobmanager >> environment: >> - | >> FLINK_PROPERTIES= >> jobmanager.rpc.address: flink-jobmanager >> rest.port: 8081 >> historyserver.web.port: 8082 >> web.upload.dir: /opt/flink >> env.hadoop.conf.dir: /opt/hadoop/conf >> env.yarn.conf.dir: /opt/hadoop/conf >> - | >> HADOOP_CLASSPATH=... >> - HADOOP_CONF_DIR=/opt/hadoop/conf >> - YARN_CONF_DIR=/opt/hadoop/conf >> volumes: >> - 'flink_shared_folder:/tmp/test' >> - 'flink_uploads:/opt/flink/flink-web-upload' >> - 'flink_hadoop_conf:/opt/hadoop/conf' >> - 'flink_hadoop_libs:/opt/hadoop-3.2.1/share' >> >> >> Thanks in advance for any support, >> Flavio >> >
Re: Flink Hadoop config on docker-compose
Hi, I'm not aware of any known issues with Hadoop and Flink on Docker. I also tried what you are doing locally, and it seems to work: flink-jobmanager| 2021-04-15 18:37:48,300 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting StandaloneSessionClusterEntrypoint. flink-jobmanager| 2021-04-15 18:37:48,338 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install default filesystem. flink-jobmanager| 2021-04-15 18:37:48,375 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install security context. flink-jobmanager| 2021-04-15 18:37:48,404 INFO org.apache.flink.runtime.security.modules.HadoopModule [] - Hadoop user set to flink (auth:SIMPLE) flink-jobmanager| 2021-04-15 18:37:48,408 INFO org.apache.flink.runtime.security.modules.JaasModule [] - Jaas file will be created as /tmp/jaas-811306162058602256.conf. flink-jobmanager| 2021-04-15 18:37:48,415 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Initializing cluster services. Here's my code: https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39 Hope this helps! On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier wrote: > Hi everybody, > I'm trying to set up reading from HDFS using docker-compose and Flink > 1.11.3. > If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir' > using FLINK_PROPERTIES (under environment section of the docker-compose > service) I see in the logs the following line: > > "Could not find Hadoop configuration via any of the supported method" > > If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not > generated by the run scripts. > Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under > environment section of the docker-compose service) I don't see that line. > > Is this the expected behavior? > > Below the relevant docker-compose service I use (I've removed the content > of HADOOP_CLASSPATH content because is too long and I didn't report the > taskmanager that is similar): > > flink-jobmanager: > container_name: flink-jobmanager > build: > context: . > dockerfile: Dockerfile.flink > args: > FLINK_VERSION: 1.11.3-scala_2.12-java11 > image: 'flink-test:1.11.3-scala_2.12-java11' > ports: > - "8091:8081" > - "8092:8082" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: flink-jobmanager > rest.port: 8081 > historyserver.web.port: 8082 > web.upload.dir: /opt/flink > env.hadoop.conf.dir: /opt/hadoop/conf > env.yarn.conf.dir: /opt/hadoop/conf > - | > HADOOP_CLASSPATH=... > - HADOOP_CONF_DIR=/opt/hadoop/conf > - YARN_CONF_DIR=/opt/hadoop/conf > volumes: > - 'flink_shared_folder:/tmp/test' > - 'flink_uploads:/opt/flink/flink-web-upload' > - 'flink_hadoop_conf:/opt/hadoop/conf' > - 'flink_hadoop_libs:/opt/hadoop-3.2.1/share' > > > Thanks in advance for any support, > Flavio >
Flink Hadoop config on docker-compose
Hi everybody, I'm trying to set up reading from HDFS using docker-compose and Flink 1.11.3. If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir' using FLINK_PROPERTIES (under environment section of the docker-compose service) I see in the logs the following line: "Could not find Hadoop configuration via any of the supported method" If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not generated by the run scripts. Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under environment section of the docker-compose service) I don't see that line. Is this the expected behavior? Below the relevant docker-compose service I use (I've removed the content of HADOOP_CLASSPATH content because is too long and I didn't report the taskmanager that is similar): flink-jobmanager: container_name: flink-jobmanager build: context: . dockerfile: Dockerfile.flink args: FLINK_VERSION: 1.11.3-scala_2.12-java11 image: 'flink-test:1.11.3-scala_2.12-java11' ports: - "8091:8081" - "8092:8082" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: flink-jobmanager rest.port: 8081 historyserver.web.port: 8082 web.upload.dir: /opt/flink env.hadoop.conf.dir: /opt/hadoop/conf env.yarn.conf.dir: /opt/hadoop/conf - | HADOOP_CLASSPATH=... - HADOOP_CONF_DIR=/opt/hadoop/conf - YARN_CONF_DIR=/opt/hadoop/conf volumes: - 'flink_shared_folder:/tmp/test' - 'flink_uploads:/opt/flink/flink-web-upload' - 'flink_hadoop_conf:/opt/hadoop/conf' - 'flink_hadoop_libs:/opt/hadoop-3.2.1/share' Thanks in advance for any support, Flavio
Re: Hadoop is not in the classpath/dependencies
This looks related to HDFS-12920; where Hadoop 2.X tries to read a duration from hdfs-default.xml expecting plain numbers, but in 3.x they also contain time units. On 3/30/2021 9:37 AM, Matthias Seiler wrote: Thank you all for the replies! I did as @Maminspapin suggested and indeed the previous error disappeared, but now the exception is ``` java.io.IOException: Cannot instantiate file system for URI: hdfs://node-1:9000/flink //... Caused by: java.lang.NumberFormatException: For input string: "30s" // this is thrown by the flink-shaded-hadoop library ``` I thought that it relates to the windowing I do, which has a slide interval of 30 seconds, but removing it displays the same error. I also added the dependency to the maven pom, but without effect. Since I use Hadoop 3.2.1, I also tried https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber but with this I can't even start a cluster (`TaskManager initialization failed`). @Robert, Flink includes roughly 100 hdfs jars. `hadoop-hdfs-client-3.2.1.jar` is one of them and is supposed to contain `DistributedFileSystem.class`, which I checked running `jar tvf hadoop-3.2.1/share/hadoop/hdfs/hadoop-hdfs-client-3.2.1.jar | grep DistributedFileSystem`. How can I verify that the class is really accessible? Cheers, Matthias On 3/26/21 10:20 AM, Robert Metzger wrote: Hey Matthias, Maybe the classpath contains hadoop libraries, but not the HDFS libraries? The "DistributedFileSystem" class needs to be accessible to the classloader. Can you check if that class is available? Best, Robert On Thu, Mar 25, 2021 at 11:10 AM Matthias Seiler <mailto:matthias.sei...@campus.tu-berlin.de>> wrote: Hello everybody, I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two machines. The job should store the checkpoints on HDFS like so: ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE); env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink")); ``` Unfortunately, the JobManager throws ``` org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/ <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/>. // ... Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. ``` and I don't understand why. `echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with wildcards. Flink's JobManger prints the classpath which includes specific packages from these Hadoop libraries. Besides that, Flink creates the state directories on HDFS, but no content. Thank you for any advice, Matthias
Re: Hadoop is not in the classpath/dependencies
Thank you all for the replies! I did as @Maminspapin suggested and indeed the previous error disappeared, but now the exception is ``` java.io.IOException: Cannot instantiate file system for URI: hdfs://node-1:9000/flink //... Caused by: java.lang.NumberFormatException: For input string: "30s" // this is thrown by the flink-shaded-hadoop library ``` I thought that it relates to the windowing I do, which has a slide interval of 30 seconds, but removing it displays the same error. I also added the dependency to the maven pom, but without effect. Since I use Hadoop 3.2.1, I also tried https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber but with this I can't even start a cluster (`TaskManager initialization failed`). @Robert, Flink includes roughly 100 hdfs jars. `hadoop-hdfs-client-3.2.1.jar` is one of them and is supposed to contain `DistributedFileSystem.class`, which I checked running `jar tvf hadoop-3.2.1/share/hadoop/hdfs/hadoop-hdfs-client-3.2.1.jar | grep DistributedFileSystem`. How can I verify that the class is really accessible? Cheers, Matthias On 3/26/21 10:20 AM, Robert Metzger wrote: > Hey Matthias, > > Maybe the classpath contains hadoop libraries, but not the HDFS > libraries? The "DistributedFileSystem" class needs to be accessible to > the classloader. Can you check if that class is available? > > Best, > Robert > > On Thu, Mar 25, 2021 at 11:10 AM Matthias Seiler > <mailto:matthias.sei...@campus.tu-berlin.de>> wrote: > > Hello everybody, > > I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two > machines. > The job should store the checkpoints on HDFS like so: > ```java > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE); > env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink")); > ``` > > Unfortunately, the JobManager throws > ``` > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not > find a file system implementation for scheme 'hdfs'. The scheme is not > directly supported by Flink and no Hadoop file system to support this > scheme could be loaded. For a full list of supported file systems, > please see > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/ > <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/>. > // ... > Caused by: > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Hadoop is > not in the classpath/dependencies. > ``` > and I don't understand why. > > `echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with > wildcards. Flink's JobManger prints the classpath which includes > specific packages from these Hadoop libraries. Besides that, Flink > creates the state directories on HDFS, but no content. > > Thank you for any advice, > Matthias >
Re: Hadoop is not in the classpath/dependencies
Hey Matthias, Maybe the classpath contains hadoop libraries, but not the HDFS libraries? The "DistributedFileSystem" class needs to be accessible to the classloader. Can you check if that class is available? Best, Robert On Thu, Mar 25, 2021 at 11:10 AM Matthias Seiler < matthias.sei...@campus.tu-berlin.de> wrote: > Hello everybody, > > I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two machines. > The job should store the checkpoints on HDFS like so: > ```java > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE); > env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink")); > ``` > > Unfortunately, the JobManager throws > ``` > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not > find a file system implementation for scheme 'hdfs'. The scheme is not > directly supported by Flink and no Hadoop file system to support this > scheme could be loaded. For a full list of supported file systems, > please see > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/. > // ... > Caused by: > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is > not in the classpath/dependencies. > ``` > and I don't understand why. > > `echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with > wildcards. Flink's JobManger prints the classpath which includes > specific packages from these Hadoop libraries. Besides that, Flink > creates the state directories on HDFS, but no content. > > Thank you for any advice, > Matthias > >
????hadoop#configuration
hi all onyarn31??flink-confhadoop yarn https://issues.apache.org/jira/browse/FLINK-21981 2??hadoop#configuration??yarnyarn??configuration??yarn??configurationconfiguration?? https://issues.apache.org/jira/browse/FLINK-21982 3??flink??-ythdfs-site/core-site??flink??hadoop#configuration??hdfs/core-siteconf??configuration??hdfs/core-stie??classloader.getresource2?? reviewmerge FLINK-21640 thanks all
Re: Hadoop is not in the classpath/dependencies
I downloaded the lib (last version) from here: https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-7.0/ and put it in the flink_home/lib directory. It helped. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Hadoop is not in the classpath/dependencies
I have the same problem ... -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hadoop is not in the classpath/dependencies
Hello everybody, I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two machines. The job should store the checkpoints on HDFS like so: ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE); env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink")); ``` Unfortunately, the JobManager throws ``` org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/. // ... Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. ``` and I don't understand why. `echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with wildcards. Flink's JobManger prints the classpath which includes specific packages from these Hadoop libraries. Besides that, Flink creates the state directories on HDFS, but no content. Thank you for any advice, Matthias
Flink job manager HA 是否可以像 Hadoop Name Node 一样手动重启?
Flink job manager HA 是否可以像 Hadoop Name Node 一样手动重启,同时保证集群正常运行? 我发现 job manager 占用内存似乎总是在缓慢不断增长,Hadoop Name Node 也有这个问题,我通过隔一段时间轮动重启Hadoop Name Node 解决这个问题,在HA模式下Flink job manager 是否可以轮动重启? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Hadoop Integration Link broken in downloads page
Thanks a lot for reporting this problem Debraj. I've created a JIRA issue for it [1]. [1] https://issues.apache.org/jira/browse/FLINK-21723 Cheers, Till On Tue, Mar 9, 2021 at 5:28 AM Debraj Manna wrote: > Hi > > It appears the Hadoop Interation > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/deployment/hadoop.html> > link is broken on downloads <https://flink.apache.org/downloads.html> > page. > > Apache Flink® 1.12.2 is our latest stable release. >> If you plan to use Apache Flink together with Apache Hadoop (run Flink on >> YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file >> system connector), please check out the Hadoop Integration >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/deployment/hadoop.html> >> documentation. > > > It is throwing 404 Error. > > Thanks > >
Hadoop Integration Link broken in downloads page
Hi It appears the Hadoop Interation <https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/deployment/hadoop.html> link is broken on downloads <https://flink.apache.org/downloads.html> page. Apache Flink® 1.12.2 is our latest stable release. > If you plan to use Apache Flink together with Apache Hadoop (run Flink on > YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file > system connector), please check out the Hadoop Integration > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/deployment/hadoop.html> > documentation. It is throwing 404 Error. Thanks
[DISCUSS] Removal of flink-swift-fs-hadoop module
Hi all, during a security maintenance PR [1], Chesnay noticed that the flink-swift-fs-hadoop module is lacking test coverage [2]. Also, there hasn't been any substantial change since 2018, when it was introduced. On the user@ ML, I could not find any proof of significant use of the module (no one mentioned any problems with it). *I propose to remove this module in Flink 1.13*. Otherwise, we would release a module with known vulnerable dependencies, and an unknown stability. If there are users, they can still use the 1.12.0 release of it. If we notice that there are a lot of users, we can reintroduce the FS, and add proper tests for it. Please let me know if you have any concerns, otherwise, I'll remove it. [1] https://github.com/apache/flink/pull/14749 [2] https://issues.apache.org/jira/browse/FLINK-20804
[DISCUSS] Removal of flink-swift-fs-hadoop module
Hi all, during a security maintenance PR [1], Chesnay noticed that the flink-swift-fs-hadoop module is lacking test coverage [2]. Also, there hasn't been any substantial change since 2018, when it was introduced. On the user@ ML, I could not find any proof of significant use of the module (no one mentioned any problems with it). *I propose to remove this module in Flink 1.13*. Otherwise, we would release a module with known vulnerable dependencies, and an unknown stability. If there are users, they can still use the 1.12.0 release of it. If we notice that there are a lot of users, we can reintroduce the FS, and add proper tests for it. Please let me know if you have any concerns, otherwise, I'll remove it. [1] https://github.com/apache/flink/pull/14749 [2] https://issues.apache.org/jira/browse/FLINK-20804
Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
此外,写ORC格式文件,对于Map格式的有人知道怎么写的话给个示例吧。 如下,拿到MapColumnVector之后怎么写呢,目前非Map的简单字段都比较清晰,直接设置xxxColumnVector.vector[rowId]的值即可。但是MapColumnVector的API比较乱,没看懂怎么用。 MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2]; 赵一旦 于2021年1月23日周六 下午1:42写道: > 已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。 > > 张锴 于2021年1月21日周四 下午7:35写道: > >> @赵一旦 >> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下 >> >> 张锴 于2021年1月21日周四 下午7:13写道: >> >> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 >> > >> > 赵一旦 于2021年1月21日周四 下午7:05写道: >> > >> >> @Michael Ran; 嗯嗯,没关系。 >> >> >> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 >> >> >> >> >> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 >> >> >> >> Michael Ran 于2021年1月21日周四 下午7:01写道: >> >> >> >> > >> >> > >> >> >> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法 >> >> > 在 2021-01-21 18:45:06,"张锴" 写道: >> >> > >import >> >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, >> >> > >DateTimeBucketer} >> >> > > >> >> > >sink.setBucketer sink.setWriter用这种方式试试 >> >> > > >> >> > > >> >> > > >> >> > >赵一旦 于2021年1月21日周四 下午6:37写道: >> >> > > >> >> > >> @Michael Ran >> >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 >> >> > >> >> >> > >> Michael Ran 于2021年1月21日周四 下午5:23写道: >> >> > >> >> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public >> >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) >> {...} >> >> > >> > 在 2021-01-21 17:18:23,"赵一旦" 写道: >> >> > >> > >具体报错信息如下: >> >> > >> > > >> >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on >> >> > Hadoop >> >> > >> are >> >> > >> > >only supported for HDFS >> >> > >> > >at >> >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( >> >> > >> > >HadoopRecoverableWriter.java:61) >> >> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem >> >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210) >> >> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem >> >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) >> >> > >> > >at >> org.apache.flink.streaming.api.functions.sink.filesystem. >> >> > >> > >> >> > >> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink >> >> > >> > >.java:260) >> >> > >> > >at >> org.apache.flink.streaming.api.functions.sink.filesystem. >> >> > >> > >> >> > >> > >> >> > >> >> >> > >> >> >> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) >> >> > >> > >at >> org.apache.flink.streaming.api.functions.sink.filesystem. >> >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) >> >> > >> > >at >> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) >> >> > >> > >at >> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) >> >> > >> > >at >> >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator >> >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96) >> >> > >> > >at >> >> > >> >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler >> >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) >> >> > >> > >at >> >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator >> >> > >> > >.initializeState(AbstractStreamOperator.java:264) >> >> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain >> >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) >> >> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask >> >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507) >> >> > >> > >at >> >> > >> >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >> >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47) >> >> > >> > >at >> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( >> >> > >> > >StreamTask.java:501) >> >> > >> > >at >> >> > >> > >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >> >> > >> > >.java:531) >> >> > >> > >at >> >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> >> > >> > >at >> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> >> > >> > >at java.lang.Thread.run(Thread.java:748) >> >> > >> > > >> >> > >> > > >> >> > >> > >赵一旦 于2021年1月21日周四 下午5:17写道: >> >> > >> > > >> >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS >> >> > >> > >> >> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 >> >> > >> > >> >> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 >> >> > >> > >> >> >> > >> > >> >> >> > >> > >> >> >> > >> > >> >> > >> >> >> > >> >> >> > >> >
Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。 张锴 于2021年1月21日周四 下午7:35写道: > @赵一旦 > 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下 > > 张锴 于2021年1月21日周四 下午7:13写道: > > > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 > > > > 赵一旦 于2021年1月21日周四 下午7:05写道: > > > >> @Michael Ran; 嗯嗯,没关系。 > >> > >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 > >> > >> > 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 > >> > >> Michael Ran 于2021年1月21日周四 下午7:01写道: > >> > >> > > >> > > >> > 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法 > >> > 在 2021-01-21 18:45:06,"张锴" 写道: > >> > >import > >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, > >> > >DateTimeBucketer} > >> > > > >> > >sink.setBucketer sink.setWriter用这种方式试试 > >> > > > >> > > > >> > > > >> > >赵一旦 于2021年1月21日周四 下午6:37写道: > >> > > > >> > >> @Michael Ran > >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 > >> > >> > >> > >> Michael Ran 于2021年1月21日周四 下午5:23写道: > >> > >> > >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public > >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > >> > >> > 在 2021-01-21 17:18:23,"赵一旦" 写道: > >> > >> > >具体报错信息如下: > >> > >> > > > >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on > >> > Hadoop > >> > >> are > >> > >> > >only supported for HDFS > >> > >> > >at > >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( > >> > >> > >HadoopRecoverableWriter.java:61) > >> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210) > >> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > >> > > >> > > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > >> > >> > >.java:260) > >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > >> > > >> > >> > > >> > >> > >> > > >> > >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) > >> > >> > >at > >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) > >> > >> > >at > >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) > >> > >> > >at > >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96) > >> > >> > >at > >> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) > >> > >> > >at > >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator > >> > >> > >.initializeState(AbstractStreamOperator.java:264) > >> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain > >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) > >> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask > >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507) > >> > >> > >at > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47) > >> > >> > >at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > >> > >> > >StreamTask.java:501) > >> > >> > >at > >> > >> > > >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > >> > >> > >.java:531) > >> > >> > >at > >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > >> > >> > >at > >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > >> > >> > >at java.lang.Thread.run(Thread.java:748) > >> > >> > > > >> > >> > > > >> > >> > >赵一旦 于2021年1月21日周四 下午5:17写道: > >> > >> > > > >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS > >> > >> > >> > >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > >> > >> > >> > >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > > >> > >> > >> > > >> > > >
Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
@赵一旦 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下 张锴 于2021年1月21日周四 下午7:13写道: > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 > > 赵一旦 于2021年1月21日周四 下午7:05写道: > >> @Michael Ran; 嗯嗯,没关系。 >> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 >> >> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 >> >> Michael Ran 于2021年1月21日周四 下午7:01写道: >> >> > >> > >> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法 >> > 在 2021-01-21 18:45:06,"张锴" 写道: >> > >import >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, >> > >DateTimeBucketer} >> > > >> > >sink.setBucketer sink.setWriter用这种方式试试 >> > > >> > > >> > > >> > >赵一旦 于2021年1月21日周四 下午6:37写道: >> > > >> > >> @Michael Ran >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 >> > >> >> > >> Michael Ran 于2021年1月21日周四 下午5:23写道: >> > >> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} >> > >> > 在 2021-01-21 17:18:23,"赵一旦" 写道: >> > >> > >具体报错信息如下: >> > >> > > >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on >> > Hadoop >> > >> are >> > >> > >only supported for HDFS >> > >> > >at >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( >> > >> > >HadoopRecoverableWriter.java:61) >> > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210) >> > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. >> > >> > >> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink >> > >> > >.java:260) >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. >> > >> > >> > >> > >> > >> >> > >> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) >> > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) >> > >> > >at >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) >> > >> > >at >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) >> > >> > >at >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96) >> > >> > >at >> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) >> > >> > >at >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator >> > >> > >.initializeState(AbstractStreamOperator.java:264) >> > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) >> > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507) >> > >> > >at >> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47) >> > >> > >at >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( >> > >> > >StreamTask.java:501) >> > >> > >at >> > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >> > >> > >.java:531) >> > >> > >at >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> > >> > >at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> > >> > >at java.lang.Thread.run(Thread.java:748) >> > >> > > >> > >> > > >> > >> > >赵一旦 于2021年1月21日周四 下午5:17写道: >> > >> > > >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS >> > >> > >> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 >> > >> > >> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 >> > >> > >> >> > >> > >> >> > >> > >> >> > >> > >> > >> >> > >> >
Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 赵一旦 于2021年1月21日周四 下午7:05写道: > @Michael Ran; 嗯嗯,没关系。 > > @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 > > 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 > > Michael Ran 于2021年1月21日周四 下午7:01写道: > > > > > > 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法 > > 在 2021-01-21 18:45:06,"张锴" 写道: > > >import > org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, > > >DateTimeBucketer} > > > > > >sink.setBucketer sink.setWriter用这种方式试试 > > > > > > > > > > > >赵一旦 于2021年1月21日周四 下午6:37写道: > > > > > >> @Michael Ran > > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 > > >> > > >> Michael Ran 于2021年1月21日周四 下午5:23写道: > > >> > > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public > > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > > >> > 在 2021-01-21 17:18:23,"赵一旦" 写道: > > >> > >具体报错信息如下: > > >> > > > > >> > >java.lang.UnsupportedOperationException: Recoverable writers on > > Hadoop > > >> are > > >> > >only supported for HDFS > > >> > >at > > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( > > >> > >HadoopRecoverableWriter.java:61) > > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > > >> > >.createRecoverableWriter(HadoopFileSystem.java:210) > > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. > > >> > > > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > > >> > >.java:260) > > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. > > >> > > > >> > > > >> > > > >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. > > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) > > >> > >at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) > > >> > >at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) > > >> > >at > > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > > >> > >.initializeState(AbstractUdfStreamOperator.java:96) > > >> > >at > > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) > > >> > >at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator > > >> > >.initializeState(AbstractStreamOperator.java:264) > > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain > > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) > > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask > > >> > >.lambda$beforeInvoke$2(StreamTask.java:507) > > >> > >at > > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > > >> > >.runThrowing(StreamTaskActionExecutor.java:47) > > >> > >at > > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > > >> > >StreamTask.java:501) > > >> > >at > > >> > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > > >> > >.java:531) > > >> > >at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > > >> > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > > >> > >at java.lang.Thread.run(Thread.java:748) > > >> > > > > >> > > > > >> > >赵一旦 于2021年1月21日周四 下午5:17写道: > > >> > > > > >> > >> Recoverable writers on Hadoop are only supported for HDFS > > >> > >> > > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > > >> > >> > > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > > >> > >> > > >> > >> > > >> > >> > > >> > > > >> > > >
Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
@Michael Ran; 嗯嗯,没关系。 @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 Michael Ran 于2021年1月21日周四 下午7:01写道: > > 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法 > 在 2021-01-21 18:45:06,"张锴" 写道: > >import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, > >DateTimeBucketer} > > > >sink.setBucketer sink.setWriter用这种方式试试 > > > > > > > >赵一旦 于2021年1月21日周四 下午6:37写道: > > > >> @Michael Ran > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 > >> > >> Michael Ran 于2021年1月21日周四 下午5:23写道: > >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > >> > 在 2021-01-21 17:18:23,"赵一旦" 写道: > >> > >具体报错信息如下: > >> > > > >> > >java.lang.UnsupportedOperationException: Recoverable writers on > Hadoop > >> are > >> > >only supported for HDFS > >> > >at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( > >> > >HadoopRecoverableWriter.java:61) > >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > >> > >.createRecoverableWriter(HadoopFileSystem.java:210) > >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > >> > >.java:260) > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > > >> > > >> > >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) > >> > >at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) > >> > >at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) > >> > >at > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > >> > >.initializeState(AbstractUdfStreamOperator.java:96) > >> > >at > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) > >> > >at > org.apache.flink.streaming.api.operators.AbstractStreamOperator > >> > >.initializeState(AbstractStreamOperator.java:264) > >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) > >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask > >> > >.lambda$beforeInvoke$2(StreamTask.java:507) > >> > >at > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > >> > >.runThrowing(StreamTaskActionExecutor.java:47) > >> > >at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > >> > >StreamTask.java:501) > >> > >at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > >> > >.java:531) > >> > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > >> > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > >> > >at java.lang.Thread.run(Thread.java:748) > >> > > > >> > > > >> > >赵一旦 于2021年1月21日周四 下午5:17写道: > >> > > > >> > >> Recoverable writers on Hadoop are only supported for HDFS > >> > >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > >> > >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > >> > >> > >> > >> > >> > >> > >> > > >> >
Re:Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法 在 2021-01-21 18:45:06,"张锴" 写道: >import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, >DateTimeBucketer} > >sink.setBucketer sink.setWriter用这种方式试试 > > > >赵一旦 于2021年1月21日周四 下午6:37写道: > >> @Michael Ran >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 >> >> Michael Ran 于2021年1月21日周四 下午5:23写道: >> >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} >> > 在 2021-01-21 17:18:23,"赵一旦" 写道: >> > >具体报错信息如下: >> > > >> > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop >> are >> > >only supported for HDFS >> > >at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( >> > >HadoopRecoverableWriter.java:61) >> > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem >> > >.createRecoverableWriter(HadoopFileSystem.java:210) >> > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. >> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink >> > >.java:260) >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. >> > >> > >> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) >> > >at org.apache.flink.streaming.api.functions.sink.filesystem. >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412) >> > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185) >> > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils >> > >.restoreFunctionState(StreamingFunctionUtils.java:167) >> > >at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator >> > >.initializeState(AbstractUdfStreamOperator.java:96) >> > >at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107) >> > >at org.apache.flink.streaming.api.operators.AbstractStreamOperator >> > >.initializeState(AbstractStreamOperator.java:264) >> > >at org.apache.flink.streaming.runtime.tasks.OperatorChain >> > >.initializeStateAndOpenOperators(OperatorChain.java:400) >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask >> > >.lambda$beforeInvoke$2(StreamTask.java:507) >> > >at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >> > >.runThrowing(StreamTaskActionExecutor.java:47) >> > >at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( >> > >StreamTask.java:501) >> > >at >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >> > >.java:531) >> > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> > >at java.lang.Thread.run(Thread.java:748) >> > > >> > > >> > >赵一旦 于2021年1月21日周四 下午5:17写道: >> > > >> > >> Recoverable writers on Hadoop are only supported for HDFS >> > >> >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 >> > >> >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 >> > >> >> > >> >> > >> >> > >>
Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer} sink.setBucketer sink.setWriter用这种方式试试 赵一旦 于2021年1月21日周四 下午6:37写道: > @Michael Ran > 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 > > Michael Ran 于2021年1月21日周四 下午5:23写道: > > > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public > > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > > 在 2021-01-21 17:18:23,"赵一旦" 写道: > > >具体报错信息如下: > > > > > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop > are > > >only supported for HDFS > > >at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( > > >HadoopRecoverableWriter.java:61) > > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > > >.createRecoverableWriter(HadoopFileSystem.java:210) > > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > > >at org.apache.flink.streaming.api.functions.sink.filesystem. > > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > > >.java:260) > > >at org.apache.flink.streaming.api.functions.sink.filesystem. > > > > > >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > > >at org.apache.flink.streaming.api.functions.sink.filesystem. > > >StreamingFileSink.initializeState(StreamingFileSink.java:412) > > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > > >.tryRestoreFunction(StreamingFunctionUtils.java:185) > > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > > >.restoreFunctionState(StreamingFunctionUtils.java:167) > > >at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > > >.initializeState(AbstractUdfStreamOperator.java:96) > > >at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > > >.initializeOperatorState(StreamOperatorStateHandler.java:107) > > >at org.apache.flink.streaming.api.operators.AbstractStreamOperator > > >.initializeState(AbstractStreamOperator.java:264) > > >at org.apache.flink.streaming.runtime.tasks.OperatorChain > > >.initializeStateAndOpenOperators(OperatorChain.java:400) > > >at org.apache.flink.streaming.runtime.tasks.StreamTask > > >.lambda$beforeInvoke$2(StreamTask.java:507) > > >at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > > >.runThrowing(StreamTaskActionExecutor.java:47) > > >at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > > >StreamTask.java:501) > > >at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > > >.java:531) > > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > > >at java.lang.Thread.run(Thread.java:748) > > > > > > > > >赵一旦 于2021年1月21日周四 下午5:17写道: > > > > > >> Recoverable writers on Hadoop are only supported for HDFS > > >> > > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > > >> > > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > > >> > > >> > > >> > > >
Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
@Michael Ran 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 Michael Ran 于2021年1月21日周四 下午5:23写道: > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > 在 2021-01-21 17:18:23,"赵一旦" 写道: > >具体报错信息如下: > > > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are > >only supported for HDFS > >at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( > >HadoopRecoverableWriter.java:61) > >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > >.createRecoverableWriter(HadoopFileSystem.java:210) > >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > >at org.apache.flink.streaming.api.functions.sink.filesystem. > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > >.java:260) > >at org.apache.flink.streaming.api.functions.sink.filesystem. > > >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > >at org.apache.flink.streaming.api.functions.sink.filesystem. > >StreamingFileSink.initializeState(StreamingFileSink.java:412) > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >.tryRestoreFunction(StreamingFunctionUtils.java:185) > >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > >.restoreFunctionState(StreamingFunctionUtils.java:167) > >at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > >.initializeState(AbstractUdfStreamOperator.java:96) > >at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > >.initializeOperatorState(StreamOperatorStateHandler.java:107) > >at org.apache.flink.streaming.api.operators.AbstractStreamOperator > >.initializeState(AbstractStreamOperator.java:264) > >at org.apache.flink.streaming.runtime.tasks.OperatorChain > >.initializeStateAndOpenOperators(OperatorChain.java:400) > >at org.apache.flink.streaming.runtime.tasks.StreamTask > >.lambda$beforeInvoke$2(StreamTask.java:507) > >at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > >.runThrowing(StreamTaskActionExecutor.java:47) > >at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > >StreamTask.java:501) > >at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > >.java:531) > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > >at java.lang.Thread.run(Thread.java:748) > > > > > >赵一旦 于2021年1月21日周四 下午5:17写道: > > > >> Recoverable writers on Hadoop are only supported for HDFS > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > >> > >> > >> >
Re:Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
这里应该是用了hdfs 的特定API吧,文件系统没兼容public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} 在 2021-01-21 17:18:23,"赵一旦" 写道: >具体报错信息如下: > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are >only supported for HDFS >at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( >HadoopRecoverableWriter.java:61) >at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem >.createRecoverableWriter(HadoopFileSystem.java:210) >at org.apache.flink.core.fs.SafetyNetWrapperFileSystem >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) >at org.apache.flink.streaming.api.functions.sink.filesystem. >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink >.java:260) >at org.apache.flink.streaming.api.functions.sink.filesystem. >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) >at org.apache.flink.streaming.api.functions.sink.filesystem. >StreamingFileSink.initializeState(StreamingFileSink.java:412) >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils >.tryRestoreFunction(StreamingFunctionUtils.java:185) >at org.apache.flink.streaming.util.functions.StreamingFunctionUtils >.restoreFunctionState(StreamingFunctionUtils.java:167) >at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator >.initializeState(AbstractUdfStreamOperator.java:96) >at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler >.initializeOperatorState(StreamOperatorStateHandler.java:107) >at org.apache.flink.streaming.api.operators.AbstractStreamOperator >.initializeState(AbstractStreamOperator.java:264) >at org.apache.flink.streaming.runtime.tasks.OperatorChain >.initializeStateAndOpenOperators(OperatorChain.java:400) >at org.apache.flink.streaming.runtime.tasks.StreamTask >.lambda$beforeInvoke$2(StreamTask.java:507) >at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >.runThrowing(StreamTaskActionExecutor.java:47) >at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( >StreamTask.java:501) >at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >.java:531) >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >at java.lang.Thread.run(Thread.java:748) > > >赵一旦 于2021年1月21日周四 下午5:17写道: > >> Recoverable writers on Hadoop are only supported for HDFS >> >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 >> >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 >> >> >>
Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
除此以外,FlinkSQL读现有的hive数据仓库也是失败。配置okhive的catalog,表信息都能出来,但select操作就是失败。 赵一旦 于2021年1月21日周四 下午5:18写道: > 具体报错信息如下: > > java.lang.UnsupportedOperationException: Recoverable writers on Hadoop > are only supported for HDFS > at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( > HadoopRecoverableWriter.java:61) > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > .createRecoverableWriter(HadoopFileSystem.java:210) > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > .createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > at org.apache.flink.streaming.api.functions.sink.filesystem. > StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > .java:260) > at org.apache.flink.streaming.api.functions.sink.filesystem. > StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java: > 270) > at org.apache.flink.streaming.api.functions.sink.filesystem. > StreamingFileSink.initializeState(StreamingFileSink.java:412) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > .tryRestoreFunction(StreamingFunctionUtils.java:185) > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > .restoreFunctionState(StreamingFunctionUtils.java:167) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > .initializeState(AbstractUdfStreamOperator.java:96) > at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > .initializeOperatorState(StreamOperatorStateHandler.java:107) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .initializeState(AbstractStreamOperator.java:264) > at org.apache.flink.streaming.runtime.tasks.OperatorChain > .initializeStateAndOpenOperators(OperatorChain.java:400) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$beforeInvoke$2(StreamTask.java:507) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > .runThrowing(StreamTaskActionExecutor.java:47) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > StreamTask.java:501) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:531) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > at java.lang.Thread.run(Thread.java:748) > > > 赵一旦 于2021年1月21日周四 下午5:17写道: > >> Recoverable writers on Hadoop are only supported for HDFS >> >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 >> >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 >> >> >>
Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
具体报错信息如下: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( HadoopRecoverableWriter.java:61) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem .createRecoverableWriter(HadoopFileSystem.java:210) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem .createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) at org.apache.flink.streaming.api.functions.sink.filesystem. StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink .java:260) at org.apache.flink.streaming.api.functions.sink.filesystem. StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) at org.apache.flink.streaming.api.functions.sink.filesystem. StreamingFileSink.initializeState(StreamingFileSink.java:412) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils .tryRestoreFunction(StreamingFunctionUtils.java:185) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils .restoreFunctionState(StreamingFunctionUtils.java:167) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator .initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler .initializeOperatorState(StreamOperatorStateHandler.java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .initializeState(AbstractStreamOperator.java:264) at org.apache.flink.streaming.runtime.tasks.OperatorChain .initializeStateAndOpenOperators(OperatorChain.java:400) at org.apache.flink.streaming.runtime.tasks.StreamTask .lambda$beforeInvoke$2(StreamTask.java:507) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 .runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( StreamTask.java:501) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:531) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.lang.Thread.run(Thread.java:748) 赵一旦 于2021年1月21日周四 下午5:17写道: > Recoverable writers on Hadoop are only supported for HDFS > > 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > > 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > > >
Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS
Recoverable writers on Hadoop are only supported for HDFS 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
??????flink-shaded-hadoop-2-uber????????????
K8SHA??HDFS ??2020??12??22?? 13:43??liujian ?? Thanks,flink-confhistory server,??hdfs??,??web ui??, ---- ??: "user-zh" https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html Best, Yang liujian <13597820...@qq.comgt; ??2020??12??21?? 1:35?? gt; ??history-server,,,??, gt; gt; gt; gt; gt; --amp;nbsp;amp;nbsp;-- gt; ??: gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "user-zh" gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; < gt; danrtsey...@gmail.comamp;gt;; gt; :amp;nbsp;2020??12??21??(??) 10:15 gt; ??:amp;nbsp;"user-zh"https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh gt
?????? flink-shaded-hadoop-2-uber????????????
Thanks,flink-confhistory server,??hdfs??,??web ui??, ---- ??: "user-zh" https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html Best, Yang liujian <13597820...@qq.comgt; ??2020??12??21?? 1:35?? gt; ??history-server,,,??, gt; gt; gt; gt; gt; --amp;nbsp;amp;nbsp;-- gt; ??: gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "user-zh" gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; < gt; danrtsey...@gmail.comamp;gt;; gt; :amp;nbsp;2020??12??21??(??) 10:15 gt; ??:amp;nbsp;"user-zh"https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh gt
Re: flink-shaded-hadoop-2-uber版本如何选择
history-server和native k8s没有关系的,如果你想使用,就需要用一个deployment单独部署history-server在K8s集群内 native k8s覆盖的场景是Flink任务如何原生地提交到K8s集群内 Best, yang liujian <13597820...@qq.com> 于2020年12月21日周一 下午8:16写道: > Thanks, 使用你下面的docker方式我测试确实可以,但是不知道Native K8s如何来操作,可以详细说一下 > 我现在是Dockerfile如下两种情况都试过 > > > COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar > /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar > ENTRYPOINT ["/docker-entrypoint.sh"] > EXPOSE 6123 8081 8082 > CMD ["help","history-server"] > > ---- > COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar > /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar > ENTRYPOINT ["/docker-entrypoint.sh","history-server"] > EXPOSE 6123 8081 8082 > CMD ["help"] > > > > 这两种都尝试过...,请帮忙指教一下 > > > > > > > --原始邮件-- > 发件人: > "user-zh" > < > danrtsey...@gmail.com; > 发送时间:2020年12月21日(星期一) 下午3:08 > 收件人:"user-zh" > 主题:Re: flink-shaded-hadoop-2-uber版本如何选择 > > > > 是的,理解没有错,history-server会启动后listen一个端口 > > 我这边尝试是没有问题的,你可以通过如下命令启动 > docker run -p 8082:8082 --env > FLINK_PROPERTIES="historyserver.archive.fs.dir: file:///tmp/flink-jobs" > flink:latest history-server > > 更多配置你参考如下文档 > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html > > Best, > Yang > > liujian <13597820...@qq.com 于2020年12月21日周一 下午1:35写道: > > 我理解的是启动一个history-server,会有一个进程,然后会暴露指定的端口,但是我好像并没有看到这样的效果,是我的理解有错吗 > > > > > --nbsp;原始邮件nbsp;-- > 发件人: > > "user-zh" > > < > danrtsey...@gmail.comgt;; > 发送时间:nbsp;2020年12月21日(星期一) 上午10:15 > 收件人:nbsp;"user-zh" > 主题:nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择 > > > > > > 你不需要修改CMD,entrypoint默认是docker-entrypoint.sh[1],是支持history-server的,只要传一个history-server的参数就可以了 > > [1]. > > https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh > > <https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh> > ; > Best, > Yang > > > liujian <13597820...@qq.comgt; 于2020年12月20日周日 下午12:45写道: > > gt; Thanks,amp;nbsp; > gt; amp;nbsp; amp;nbsp; > amp;nbsp;但是我需要访问historyServer,那么应该需要如何操作我将flink > gt; 1.12.0的Dockerfile 修改成CMD ["history-server"]amp;nbsp; > 并暴露8082端口,但是好像达不到这个效果 > gt; > gt; > gt; > gt; > gt; > --amp;nbsp;原始邮件amp;nbsp;-- > gt; 发件人: > > gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; > "user-zh" > > gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; > < > gt; danrtsey...@gmail.comamp;gt;; > gt; 发送时间:amp;nbsp;2020年12月19日(星期六) 晚上9:35 > gt; 收件人:amp;nbsp;"user-zh" amp;gt;; > gt; > gt; 主题:amp;nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择 > gt; > gt; > gt; > gt; 你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了 > gt; Flink > gt; > gt; > > Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的 > gt; > > 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的 > gt; > gt; > gt; Best, > gt; Yang > gt; > gt; liujian <13597820...@qq.comamp;gt; 于2020年12月19日周六 > 下午8:29写道: > gt; > gt; amp;gt; > gt; amp;gt; > gt; > > HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下 > gt; amp;gt; > gt; amp;gt; > gt; amp;gt; > > --amp;amp;nbsp;原始邮件amp;amp;nbsp;-- > gt; amp;gt; 发件人: > gt; > > amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;n
?????? flink-shaded-hadoop-2-uber????????????
Thanks, docker??,??Native K8s??,?? Dockerfile?? COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar ENTRYPOINT ["/docker-entrypoint.sh"] EXPOSE 6123 8081 8082 CMD ["help","history-server"] ---- COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar ENTRYPOINT ["/docker-entrypoint.sh","history-server"] EXPOSE 6123 8081 8082 CMD ["help"] ??...,?? ---- ??: "user-zh" https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html Best, Yang liujian <13597820...@qq.com ??2020??12??21?? 1:35?? ??history-server,,,??, --nbsp;nbsp;-- ??: "user-zh" < danrtsey...@gmail.comgt;; :nbsp;2020??12??21??(??) 10:15 ??:nbsp;"user-zh"https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh Best, Yang liujian <13597820...@qq.comgt; ??2020??12??20?? 12:45?? gt; Thanks,amp;nbsp; gt; amp;nbsp; amp;nbsp; amp;nbsp;??historyServer,flink gt; 1.12.0??Dockerfile ??CMD ["history-server"]amp;nbsp; ??8082,?? gt; gt; gt; gt; gt; --amp;nbsp;amp;nbsp;-- gt; ??: gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "user-zh" gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; < gt; danrtsey...@gmail.comamp;gt;; gt; :amp;nbsp;2020??12??19??(??) 9:35 gt; ??:amp;nbsp;"user-zh"https://github.com/apache/flink-shaded gt <https://github.com/apache/flink-shadedgt;; amp;gt; amp;amp;gt; [2]. gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation gt <https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationgt;; amp;gt gt; < https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationamp;gtgt ;; gt; amp;amp;gt gt; amp;gt; < gt; https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationamp;amp;gtamp;gt gt <https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationamp;amp;gtamp;gtgt;; ; gt; amp;gt; ; gt; amp;gt; amp;amp;gt; Best, gt; amp;gt; amp;amp;gt; Yang gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt;
Re: flink-shaded-hadoop-2-uber版本如何选择
是的,理解没有错,history-server会启动后listen一个端口 我这边尝试是没有问题的,你可以通过如下命令启动 docker run -p 8082:8082 --env FLINK_PROPERTIES="historyserver.archive.fs.dir: file:///tmp/flink-jobs" flink:latest history-server 更多配置你参考如下文档 https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html Best, Yang liujian <13597820...@qq.com> 于2020年12月21日周一 下午1:35写道: > 我理解的是启动一个history-server,会有一个进程,然后会暴露指定的端口,但是我好像并没有看到这样的效果,是我的理解有错吗 > > > > > --原始邮件-- > 发件人: > "user-zh" > < > danrtsey...@gmail.com; > 发送时间:2020年12月21日(星期一) 上午10:15 > 收件人:"user-zh" > 主题:Re: flink-shaded-hadoop-2-uber版本如何选择 > > > > > 你不需要修改CMD,entrypoint默认是docker-entrypoint.sh[1],是支持history-server的,只要传一个history-server的参数就可以了 > > [1]. > https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh > > Best, > Yang > > > liujian <13597820...@qq.com 于2020年12月20日周日 下午12:45写道: > > Thanks,nbsp; > nbsp; nbsp; nbsp;但是我需要访问historyServer,那么应该需要如何操作我将flink > 1.12.0的Dockerfile 修改成CMD ["history-server"]nbsp; > 并暴露8082端口,但是好像达不到这个效果 > > > > > --nbsp;原始邮件nbsp;------ > 发件人: > > "user-zh" > > < > danrtsey...@gmail.comgt;; > 发送时间:nbsp;2020年12月19日(星期六) 晚上9:35 > 收件人:nbsp;"user-zh" > 主题:nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择 > > > > 你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了 > Flink > > > Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的 > > 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的 > > > Best, > Yang > > liujian <13597820...@qq.comgt; 于2020年12月19日周六 下午8:29写道: > > gt; > gt; > > HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下 > gt; > gt; > gt; > --amp;nbsp;原始邮件amp;nbsp;-- > gt; 发件人: > > gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; > "user-zh" > > gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; > < > gt; danrtsey...@gmail.comamp;gt;; > gt; 发送时间:amp;nbsp;2020年12月16日(星期三) 晚上7:21 > gt; 收件人:amp;nbsp;"superainbower" amp;gt;; > gt; 抄送:amp;nbsp;"user-zh" amp;gt;; > gt; 主题:amp;nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择 > gt; > gt; > gt; > gt; > gt; > > 如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载 > gt; > gt; Best, > gt; Yang > gt; > gt; superainbower 于2020年12月16日周三 下午6:19写道: > gt; > gt; amp;gt; 借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去 > gt; amp;gt; 在2020年12月16日 10:53,Yang Wang < > danrtsey...@gmail.comamp;gt; > 写道: > gt; amp;gt; > gt; amp;gt; 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例 > gt; amp;gt; > gt; amp;gt; 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本 > gt; amp;gt; > gt; amp;gt; > gt; > > 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2], > gt; amp;gt; 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3 > gt; amp;gt; > gt; amp;gt; > 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0 > gt; amp;gt; > gt; amp;gt; > gt; amp;gt; [1]. https://github.com/apache/flink-shaded > <https://github.com/apache/flink-shaded>; gt; amp;gt; > [2]. > gt; amp;gt; > gt; amp;gt; > gt; > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation > > <https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation>; > gt > < > https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationgt > ;; > amp;gt > gt; < > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationamp;gtgt > > <https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationamp;gtgt>; > ; > gt; ; > gt; amp;gt; Best, > gt; amp;gt; Yang > gt; amp;gt; > gt; amp;gt; 赢峰 于2020年12月11日周五 上午8:45写道: > gt; amp;gt; > gt; amp;gt; amp;gt; flink-shaded-hadoop-2-uber版本如何选择? > gt; amp;gt; amp;gt; > gt; amp;gt; amp;gt; > gt; amp;gt; amp;gt; xxx-xxx 分别表示什么意思? > gt; amp;gt; amp;gt; > gt; amp;gt; amp;gt; > gt; amp;gt; amp;gt; > gt; amp;gt; > gt; amp;gt;
?????? flink-shaded-hadoop-2-uber????????????
??history-server,,,??, ---- ??: "user-zh" https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh Best, Yang liujian <13597820...@qq.com ??2020??12??20?? 12:45?? Thanks,nbsp; nbsp; nbsp; nbsp;??historyServer,flink 1.12.0??Dockerfile ??CMD ["history-server"]nbsp; ??8082,?? --nbsp;nbsp;-- ??: "user-zh" < danrtsey...@gmail.comgt;; :nbsp;2020??12??19??(??) 9:35 ??:nbsp;"user-zh"https://github.com/apache/flink-shaded gt; amp;gt; [2]. gt; amp;gt; gt; amp;gt; gt; https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation gt
Re: flink-shaded-hadoop-2-uber版本如何选择
你不需要修改CMD,entrypoint默认是docker-entrypoint.sh[1],是支持history-server的,只要传一个history-server的参数就可以了 [1]. https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh Best, Yang liujian <13597820...@qq.com> 于2020年12月20日周日 下午12:45写道: > Thanks, > 但是我需要访问historyServer,那么应该需要如何操作我将flink > 1.12.0的Dockerfile 修改成CMD ["history-server"] 并暴露8082端口,但是好像达不到这个效果 > > > > > --原始邮件-- > 发件人: > "user-zh" > < > danrtsey...@gmail.com; > 发送时间:2020年12月19日(星期六) 晚上9:35 > 收件人:"user-zh" > 主题:Re: flink-shaded-hadoop-2-uber版本如何选择 > > > > 你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了 > Flink > > Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的 > 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的 > > > Best, > Yang > > liujian <13597820...@qq.com 于2020年12月19日周六 下午8:29写道: > > > > HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下 > > > --nbsp;原始邮件nbsp;-- > 发件人: > > "user-zh" > > < > danrtsey...@gmail.comgt;; > 发送时间:nbsp;2020年12月16日(星期三) 晚上7:21 > 收件人:nbsp;"superainbower" 抄送:nbsp;"user-zh" 主题:nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择 > > > > > > 如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载 > > Best, > Yang > > superainbower > gt; 借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去 > gt; 在2020年12月16日 10:53,Yang Wang 写道: > gt; > gt; 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例 > gt; > gt; 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本 > gt; > gt; > > 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2], > gt; 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3 > gt; > gt; 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0 > gt; > gt; > gt; [1]. https://github.com/apache/flink-shaded > gt; [2]. > gt; > gt; > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation > > <https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation>; > gt > < > https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationgt > ; > ; > gt; Best, > gt; Yang > gt; > gt; 赢峰 gt; > gt; gt; flink-shaded-hadoop-2-uber版本如何选择? > gt; gt; > gt; gt; > gt; gt; xxx-xxx 分别表示什么意思? > gt; gt; > gt; gt; > gt; gt; > gt; > gt;
?????? flink-shaded-hadoop-2-uber????????????
Thanks, ??historyServer,flink 1.12.0??Dockerfile ??CMD ["history-server"] ??8082,?? ---- ??: "user-zh" https://github.com/apache/flink-shaded gt; [2]. gt; gt; https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation gt
Re: flink-shaded-hadoop-2-uber版本如何选择
你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了 Flink Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的 Best, Yang liujian <13597820...@qq.com> 于2020年12月19日周六 下午8:29写道: > > HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下 > > > --原始邮件-- > 发件人: > "user-zh" > < > danrtsey...@gmail.com; > 发送时间:2020年12月16日(星期三) 晚上7:21 > 收件人:"superainbower" 抄送:"user-zh" 主题:Re: flink-shaded-hadoop-2-uber版本如何选择 > > > > > 如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载 > > Best, > Yang > > superainbower > 借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去 > 在2020年12月16日 10:53,Yang Wang > 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例 > > 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本 > > > 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2], > 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3 > > 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0 > > > [1]. https://github.com/apache/flink-shaded > [2]. > > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation > > <https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation> > ; > Best, > Yang > > 赢峰 > flink-shaded-hadoop-2-uber版本如何选择? > > > xxx-xxx 分别表示什么意思? > > > > >
?????? flink-shaded-hadoop-2-uber????????????
HDFS??Ha,hdfs-site.xml,,configMap??hdfs-site.xml??$FLINK_HOME/conf?? ---- ??: "user-zh" https://github.com/apache/flink-shaded [2]. https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation Best, Yang
Re: flink-shaded-hadoop-2-uber版本如何选择
如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载 Best, Yang superainbower 于2020年12月16日周三 下午6:19写道: > 借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去 > 在2020年12月16日 10:53,Yang Wang 写道: > > 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例 > > 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本 > > 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2], > 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3 > > 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0 > > > [1]. https://github.com/apache/flink-shaded > [2]. > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation > > Best, > Yang > > 赢峰 于2020年12月11日周五 上午8:45写道: > > > flink-shaded-hadoop-2-uber版本如何选择? > > > > > > xxx-xxx 分别表示什么意思? > > > > > > > >
回复:flink-shaded-hadoop-2-uber版本如何选择
借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去 在2020年12月16日 10:53,Yang Wang 写道: 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2], 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0 [1]. https://github.com/apache/flink-shaded [2]. https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation Best, Yang 赢峰 于2020年12月11日周五 上午8:45写道: > flink-shaded-hadoop-2-uber版本如何选择? > > > xxx-xxx 分别表示什么意思? > > >
Re: flink-shaded-hadoop-2-uber版本如何选择
以flink-shaded-hadoop-2-uber的2.8.3-10.0为例 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2], 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0 [1]. https://github.com/apache/flink-shaded [2]. https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation Best, Yang 赢峰 于2020年12月11日周五 上午8:45写道: > flink-shaded-hadoop-2-uber版本如何选择? > > > xxx-xxx 分别表示什么意思? > > >
Re: flink-shaded-hadoop-2-uber*-* 版本确定问题
你得确认hadoop classpath返回的是完整的,正常情况下hadoop classpath这个命令会把所有的hadoop jar都包含进去的 如果报类或者方法不存在需要确认相应的jar是否存在,并且包含进去了 社区推荐hadoop classpath的方式主要是想让Flink做到hadoop free,这样在hadoop2和hadoop3都可以正常运行了 Best, Yang Jacob <17691150...@163.com> 于2020年12月15日周二 上午9:25写道: > 谢谢回复! > > 这个文档我也有查看 > > 前几日在flink1.9-1.12各个客户端测试提交job时候发现 > 对于1.10+的版本,我手动导入export HADOOP_CLASSPATH=`hadoop > > classpath`,没有效果,各种报错,基本都是Hadoop相关类、方法不存在(NoSuchMethod之类错误),把pom文件改来改去依然无用,后来只在pom文件中导入依赖:flink-shaded-hadoop-2-uber*-*,竟然可以正常提交并运行job了。 > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-shaded-hadoop-2-uber*-* 版本确定问题
谢谢回复! 这个文档我也有查看 前几日在flink1.9-1.12各个客户端测试提交job时候发现 对于1.10+的版本,我手动导入export HADOOP_CLASSPATH=`hadoop classpath`,没有效果,各种报错,基本都是Hadoop相关类、方法不存在(NoSuchMethod之类错误),把pom文件改来改去依然无用,后来只在pom文件中导入依赖:flink-shaded-hadoop-2-uber*-*,竟然可以正常提交并运行job了。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-shaded-hadoop-2-uber*-* 版本确定问题
flink已经不建议将hadoop的jar放到lib里了 可以通过 export HADOOP_CLASSPATH=`hadoop classpath` 加载hadoop的依赖 参考链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink-shaded-hadoop-2-uber*-* 版本确定问题
请问在升级flink版本的过程中,需要在flink/lib里面引入该包,但该包的版本号如何确定? flink-shaded-hadoop-2-uber*-* -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink-shaded-hadoop-2-uber版本如何选择
flink-shaded-hadoop-2-uber版本如何选择? xxx-xxx 分别表示什么意思?
Re: Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop
hi、可以去hadoop的一个节点直接打镜像哈,打镜像的时候把需要的hadoop依赖包、flink一起打包到docker里面,然后配置一下环境变量就可以用了;如果你的docker部署节点有hadoop或flink也可以直接外挂;目前我们使用的是第一种 Yang Wang 于2020年10月12日周一 上午10:23写道: > 只需要base社区的镜像,然后再加上一层(拷贝flink-shaded-hadoop),commit到docker > image,然后push到docker registry就可以了 > > 例如Dockerfile可以如下 > FROM flink:1.11.1-scala_2.11 > COPY flink-shaded-hadoop-2*.jar /opt/flink/lib/ > > 另外,flink-shaded-hadoop可以从这里下载[1] > > [1]. > https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 > > > Best, > Yang > > yang 于2020年10月10日周六 下午5:50写道: > > > 麻烦问一下,从新打镜像,是把原来的包解压然后从新打包么 > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop
只需要base社区的镜像,然后再加上一层(拷贝flink-shaded-hadoop),commit到docker image,然后push到docker registry就可以了 例如Dockerfile可以如下 FROM flink:1.11.1-scala_2.11 COPY flink-shaded-hadoop-2*.jar /opt/flink/lib/ 另外,flink-shaded-hadoop可以从这里下载[1] [1]. https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 Best, Yang yang 于2020年10月10日周六 下午5:50写道: > 麻烦问一下,从新打镜像,是把原来的包解压然后从新打包么 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop
麻烦问一下,您是怎么从新打镜像的,是把原来的jar解压出来,然后在打包么? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop
麻烦问一下,从新打镜像,是把原来的包解压然后从新打包么 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Issues with Flink Batch and Hadoop dependency
Hi Dan, Your approach in general is good. You might want to use the bundled hadoop uber jar [1] to save some time if you find the appropriate version. You can also build your own version and include it then in lib/. In general, I'd recommend moving away from sequence files. As soon as you change your records minimally, everything falls apart. Going with established binary formats like Avro or Parquet is usually desired also because of the additional tooling and pays quickly off in the long run. [1] https://flink.apache.org/downloads.html#additional-components On Sat, Aug 29, 2020 at 10:50 PM Dan Hill wrote: > I was able to get a basic version to work by including a bunch of hadoop > and s3 dependencies in the job jar and hacking in some hadoop config > values. It's probably not optimal but it looks like I'm unblocked. > > On Fri, Aug 28, 2020 at 12:11 PM Dan Hill wrote: > >> I'm assuming I have a simple, common setup problem. I've spent 6 hours >> debugging and haven't been able to figure it out. Any help would be >> greatly appreciated. >> >> >> *Problem* >> I have a Flink Streaming job setup that writes SequenceFiles in S3. When >> I try to create a Flink Batch job to read these Sequence files, I get the >> following error: >> >> NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat >> >> It fails on this readSequenceFile. >> >> env.createInput(HadoopInputs.readSequenceFile(Text.class, >> ByteWritable.class, INPUT_FILE)) >> >> If I directly depend on org-apache-hadoop/hadoop-mapred when building the >> job, I get the following error when trying to run the job: >> >> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No >> FileSystem for scheme "s3" >> at >> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332) >> at >> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352) >> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124) >> at >> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403) >> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371) >> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477) >> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361) >> at >> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209) >> at >> org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48) >> at >> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254) >> at >> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150) >> at >> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58) >> at >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:257) >> >> >> *Extra context* >> I'm using this Helm chart <https://hub.helm.sh/charts/riskfocus/flink> >> for creating Flink. I'm using v1.10.1. >> >> >> *Questions* >> Are there any existing projects that read batch Hadoop file formats from >> S3? >> >> I've looked at these instructions for Hadoop Integration >> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#add-hadoop-classpaths>. >> I'm assuming my configuration is wrong. I'm also assuming I need the >> hadoop dependency properly setup in the jobmanager and taskmanager (not in >> the job itself). If I use this Helm chart, do I need to download a hadoop >> common jar into the Flink images for jobmanager and taskmanager? Are there >> pre-built images which I can use that already have the dependencies setup? >> >> >> - Dan >> > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
Re: Issues with Flink Batch and Hadoop dependency
I was able to get a basic version to work by including a bunch of hadoop and s3 dependencies in the job jar and hacking in some hadoop config values. It's probably not optimal but it looks like I'm unblocked. On Fri, Aug 28, 2020 at 12:11 PM Dan Hill wrote: > I'm assuming I have a simple, common setup problem. I've spent 6 hours > debugging and haven't been able to figure it out. Any help would be > greatly appreciated. > > > *Problem* > I have a Flink Streaming job setup that writes SequenceFiles in S3. When > I try to create a Flink Batch job to read these Sequence files, I get the > following error: > > NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat > > It fails on this readSequenceFile. > > env.createInput(HadoopInputs.readSequenceFile(Text.class, > ByteWritable.class, INPUT_FILE)) > > If I directly depend on org-apache-hadoop/hadoop-mapred when building the > job, I get the following error when trying to run the job: > > Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No > FileSystem for scheme "s3" > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332) > at > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124) > at > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361) > at > org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209) > at > org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48) > at > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254) > at > org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150) > at > org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:257) > > > *Extra context* > I'm using this Helm chart <https://hub.helm.sh/charts/riskfocus/flink> > for creating Flink. I'm using v1.10.1. > > > *Questions* > Are there any existing projects that read batch Hadoop file formats from > S3? > > I've looked at these instructions for Hadoop Integration > <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#add-hadoop-classpaths>. > I'm assuming my configuration is wrong. I'm also assuming I need the > hadoop dependency properly setup in the jobmanager and taskmanager (not in > the job itself). If I use this Helm chart, do I need to download a hadoop > common jar into the Flink images for jobmanager and taskmanager? Are there > pre-built images which I can use that already have the dependencies setup? > > > - Dan >
Issues with Flink Batch and Hadoop dependency
I'm assuming I have a simple, common setup problem. I've spent 6 hours debugging and haven't been able to figure it out. Any help would be greatly appreciated. *Problem* I have a Flink Streaming job setup that writes SequenceFiles in S3. When I try to create a Flink Batch job to read these Sequence files, I get the following error: NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat It fails on this readSequenceFile. env.createInput(HadoopInputs.readSequenceFile(Text.class, ByteWritable.class, INPUT_FILE)) If I directly depend on org-apache-hadoop/hadoop-mapred when building the job, I get the following error when trying to run the job: Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3" at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209) at org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254) at org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150) at org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:257) *Extra context* I'm using this Helm chart <https://hub.helm.sh/charts/riskfocus/flink> for creating Flink. I'm using v1.10.1. *Questions* Are there any existing projects that read batch Hadoop file formats from S3? I've looked at these instructions for Hadoop Integration <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#add-hadoop-classpaths>. I'm assuming my configuration is wrong. I'm also assuming I need the hadoop dependency properly setup in the jobmanager and taskmanager (not in the job itself). If I use this Helm chart, do I need to download a hadoop common jar into the Flink images for jobmanager and taskmanager? Are there pre-built images which I can use that already have the dependencies setup? - Dan
Re: Re: hive-exec依赖导致hadoop冲突问题
好的谢谢回复, 在指定hive版本为2.1.1时,我选择了在程序中导入hive-exec-2.1.1、flink-connector-hive_2.11-1.11.1依赖,可正常操作hive table; best, amenhub 发件人: Rui Li 发送时间: 2020-08-24 21:33 收件人: user-zh 主题: Re: hive-exec依赖导致hadoop冲突问题 Hi, hive-exec本身并不包含Hadoop,如果是因为maven的传递依赖引入的话可以在打包时去掉。运行时使用的Hadoop版本可以用你集群Hadoop版本,而不是hive本身依赖的Hadoop版本。另外对于Flink 1.11也可以考虑使用官方提供的flink-sql-connector-hive Uber jar,这个jar包含所有hive的依赖(Hadoop的依赖还是需要另外添加)。更详细的信息建议参考文档 [1][2]。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#dependencies [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes On Mon, Aug 24, 2020 at 9:05 PM amen...@163.com wrote: > > 补充一下,当我移除hive-exec等程序中的hadoop依赖时,任务依旧异常,所以也许是我哪个地方没有到位,觉得依赖冲突是因为在测试hive集成之前,我提交过到yarn执行并无异常,所以排查思路来到了hive这里, > 现在看来,可能是另外某个原因导致的,贴一点点异常栈如下: > > Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: > Could not deploy Yarn job cluster. > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431) > at > org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) > at > org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) > ... 19 more > Caused by: java.lang.ClassCastException: > org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto > cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy63.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:311) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) > at com.sun.proxy.$Proxy64.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:618) > at > org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43) > at > org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:280) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424) > ... 24 more > > best, > amenhub > > 发件人: amen...@163.com > 发送时间: 2020-08-24 20:40 > 收件人: user-zh > 主题: hive-exec依赖导致hadoop冲突问题 > hi, everyone > > 组件版本:flink-1.11.1,hive-2.1.1 > > 问题描述: > 使用Table > API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行; > > 当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive > table(不会发生hadoop依赖冲突); > > 但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突; > > > 请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗? > > best, > amenhub > -- Best regards! Rui Li
Re: hive-exec依赖导致hadoop冲突问题
Hi, hive-exec本身并不包含Hadoop,如果是因为maven的传递依赖引入的话可以在打包时去掉。运行时使用的Hadoop版本可以用你集群Hadoop版本,而不是hive本身依赖的Hadoop版本。另外对于Flink 1.11也可以考虑使用官方提供的flink-sql-connector-hive Uber jar,这个jar包含所有hive的依赖(Hadoop的依赖还是需要另外添加)。更详细的信息建议参考文档 [1][2]。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#dependencies [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes On Mon, Aug 24, 2020 at 9:05 PM amen...@163.com wrote: > > 补充一下,当我移除hive-exec等程序中的hadoop依赖时,任务依旧异常,所以也许是我哪个地方没有到位,觉得依赖冲突是因为在测试hive集成之前,我提交过到yarn执行并无异常,所以排查思路来到了hive这里, > 现在看来,可能是另外某个原因导致的,贴一点点异常栈如下: > > Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: > Could not deploy Yarn job cluster. > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431) > at > org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) > at > org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) > ... 19 more > Caused by: java.lang.ClassCastException: > org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto > cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy63.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:311) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) > at com.sun.proxy.$Proxy64.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:618) > at > org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43) > at > org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:280) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424) > ... 24 more > > best, > amenhub > > 发件人: amen...@163.com > 发送时间: 2020-08-24 20:40 > 收件人: user-zh > 主题: hive-exec依赖导致hadoop冲突问题 > hi, everyone > > 组件版本:flink-1.11.1,hive-2.1.1 > > 问题描述: > 使用Table > API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行; > > 当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive > table(不会发生hadoop依赖冲突); > > 但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突; > > > 请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗? > > best, > amenhub > -- Best regards! Rui Li
回复: hive-exec依赖导致hadoop冲突问题
补充一下,当我移除hive-exec等程序中的hadoop依赖时,任务依旧异常,所以也许是我哪个地方没有到位,觉得依赖冲突是因为在测试hive集成之前,我提交过到yarn执行并无异常,所以排查思路来到了hive这里, 现在看来,可能是另外某个原因导致的,贴一点点异常栈如下: Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster. at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431) at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ... 19 more Caused by: java.lang.ClassCastException: org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) at com.sun.proxy.$Proxy63.getClusterNodes(Unknown Source) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:311) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) at com.sun.proxy.$Proxy64.getClusterNodes(Unknown Source) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:618) at org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43) at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:280) at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480) at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424) ... 24 more best, amenhub 发件人: amen...@163.com 发送时间: 2020-08-24 20:40 收件人: user-zh 主题: hive-exec依赖导致hadoop冲突问题 hi, everyone 组件版本:flink-1.11.1,hive-2.1.1 问题描述: 使用Table API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行; 当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive table(不会发生hadoop依赖冲突); 但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突; 请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗? best, amenhub