Can anyone please help me with the conf files? Am I missing anything on the configuration part?
Regards, Pritam. On Tue, 15 Oct 2019 at 08:48, Pritam Sadhukhan <sadhukhan.pri...@gmail.com> wrote: > Thanks for the information. > > I am able to see all the files using hdfs shell command. > Even I am able to pull the data on flink with > > environment.readTextFile("hdfs://host:port/qlake/logs/sa_structured_events") > > The issue is only with orcdatasource implementation. > Here is my configuration files. > > *flink-conf.yaml:* > > ################################################################################ > # Licensed to the Apache Software Foundation (ASF) under one > # or more contributor license agreements. See the NOTICE file > # distributed with this work for additional information > # regarding copyright ownership. The ASF licenses this file > # to you under the Apache License, Version 2.0 (the > # "License"); you may not use this file except in compliance > # with the License. You may obtain a copy of the License at > # > # http://www.apache.org/licenses/LICENSE-2.0 > # > # Unless required by applicable law or agreed to in writing, software > # distributed under the License is distributed on an "AS IS" BASIS, > # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > # See the License for the specific language governing permissions and > # limitations under the License. > > ################################################################################ > > > > #============================================================================== > # Common > > #============================================================================== > > # The external address of the host on which the JobManager runs and can be > > # reached by the TaskManagers and any clients which want to connect. This > setting > > # is only used in Standalone mode and may be overwritten on the JobManager > side > > # by specifying the --host <hostname> parameter of the bin/jobmanager.sh > executable. > > # In high availability mode, if you use the bin/start-cluster.sh script and > setup > > # the conf/masters file, this will be taken care of automatically. Yarn/Mesos > > # automatically configure the host name based on the hostname of the node > where the > # JobManager runs. > > jobmanager.rpc.address: localhost > > # The RPC port where the JobManager is reachable. > > jobmanager.rpc.port: 6123 > > > # The heap size for the JobManager JVM > > jobmanager.heap.size: 1024m > > > # The heap size for the TaskManager JVM > > taskmanager.heap.size: 1024m > > > > # The number of task slots that each TaskManager offers. Each slot runs one > parallel pipeline. > > taskmanager.numberOfTaskSlots: 8 > > > # The parallelism used for programs that did not specify and other > parallelism. > > parallelism.default: 1 > > # The default file system scheme and authority. > # > > # By default file paths without scheme are interpreted relative to the local > > # root file system 'file:///'. Use this to override the default and interpret > # relative paths relative to a different file system, > # for example 'hdfs://mynamenode:12345' > # > # fs.default-scheme > > > #============================================================================== > # High Availability > > #============================================================================== > > # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. > # > # high-availability: zookeeper > > > # The path where metadata for master recovery is persisted. While ZooKeeper > stores > > # the small ground truth for checkpoint and leader election, this location > stores > # the larger objects, like persisted dataflow graphs. > # > # Must be a durable file system that is accessible from all nodes > # (like HDFS, S3, Ceph, nfs, ...) > # > # high-availability.storageDir: hdfs:///flink/ha/ > > # The list of ZooKeeper quorum peers that coordinate the high-availability > # setup. This must be a list of the form: > # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) > # > # high-availability.zookeeper.quorum: localhost:2181 > > > # ACL options are based on > https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes > > # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" > (ZOO_OPEN_ACL_UNSAFE) > > # The default value is "open" and it can be changed to "creator" if ZK > security is enabled > # > # high-availability.zookeeper.client.acl: open > > > #============================================================================== > # Fault tolerance and checkpointing > > #============================================================================== > > # The backend that will be used to store operator state checkpoints if > # checkpointing is enabled. > # > # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the > # <class-name-of-factory>. > # > # state.backend: filesystem > > > # Directory for checkpoints filesystem, when using any of the default bundled > # state backends. > # > # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints > > # Default target directory for savepoints, optional. > # > # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints > > # Flag to enable/disable incremental checkpoints for backends that > # support incremental checkpoints (like the RocksDB state backend). > # > # state.backend.incremental: false > > > #============================================================================== > # Web Frontend > > #============================================================================== > > # The address under which the web-based runtime monitor listens. > # > #web.address: 0.0.0.0 > > # The port under which the web-based runtime monitor listens. > # A value of -1 deactivates the web server. > > rest.port: 8081 > > # Flag to specify whether job submission is enabled from the web-based > # runtime monitor. Uncomment to disable. > > #web.submit.enable: false > > > #============================================================================== > # Advanced > > #============================================================================== > > # Override the directories for temporary files. If not specified, the > > # system-specific Java temporary directory (java.io.tmpdir property) is taken. > # > > # For framework setups on Yarn or Mesos, Flink will automatically pick up the > # containers' temp directories without any need for configuration. > # > # Add a delimited list for multiple directories, using the system directory > # delimiter (colon ':' on unix) or a comma, e.g.: > # /data1/tmp:/data2/tmp:/data3/tmp > # > # Note: Each directory entry is read from and written to by a different I/O > > # thread. You can include the same directory multiple times in order to create > > # multiple I/O threads against that directory. This is for example relevant > for > # high-throughput RAIDs. > # > # io.tmp.dirs: /tmp > > > # Specify whether TaskManager's managed memory should be allocated when > starting > # up (true) or when memory is requested. > # > # We recommend to set this value to 'true' only in setups for pure batch > > # processing (DataSet API). Streaming setups currently do not use the > TaskManager's > > # managed memory: The 'rocksdb' state backend uses RocksDB's own memory > management, > > # while the 'memory' and 'filesystem' backends explicitly keep data as objects > # to save on serialization cost. > # > # taskmanager.memory.preallocate: false > > > # The classloading resolve order. Possible values are 'child-first' (Flink's > default) > # and 'parent-first' (Java's default). > # > # Child first classloading allows users to use different dependency/library > # versions in their application than those in the classpath. Switching back > # to 'parent-first' may help with debugging dependency issues. > # > # classloader.resolve-order: child-first > > > # The amount of memory going to the network stack. These numbers usually need > > # no tuning. Adjusting them may be necessary in case of an "Insufficient > number > > # of network buffers" error. The default min is 64MB, teh default max is 1GB. > # > # taskmanager.network.memory.fraction: 0.1 > # taskmanager.network.memory.min: 64mb > # taskmanager.network.memory.max: 1gb > > > #============================================================================== > # Flink Cluster Security Configuration > > #============================================================================== > > > # Kerberos authentication for various components - Hadoop, ZooKeeper, and > connectors - > # may be enabled in four steps: > # 1. configure the local krb5.conf file > > # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) > # 3. make the credentials available to various JAAS login contexts > # 4. configure the connector to use JAAS/SASL > > > # The below configure how Kerberos credentials are provided. A keytab will be > used instead of > # a ticket cache if the keytab path and principal are set. > > # security.kerberos.login.use-ticket-cache: true > # security.kerberos.login.keytab: /path/to/kerberos/keytab > # security.kerberos.login.principal: flink-user > > # The configuration below defines which JAAS login contexts > > # security.kerberos.login.contexts: Client,KafkaClient > > > #============================================================================== > # ZK Security Configuration > > #============================================================================== > > > # Below configurations are applicable if ZK ensemble is configured for > security > > > # Override below configuration to provide custom ZK service name if configured > # zookeeper.sasl.service-name: zookeeper > > > # The configuration below must match one of the values set in > "security.kerberos.login.contexts" > # zookeeper.sasl.login-context-name: Client > > > #============================================================================== > # HistoryServer > > #============================================================================== > > > # The HistoryServer is started and stopped via bin/historyserver.sh > (start|stop) > > # Directory to upload completed jobs to. Add this directory to the list of > # monitored directories of the HistoryServer as well (see below). > #jobmanager.archive.fs.dir: hdfs:///completed-jobs/ > > # The address under which the web-based HistoryServer listens. > #historyserver.web.address: 0.0.0.0 > > # The port under which the web-based HistoryServer listens. > #historyserver.web.port: 8082 > > # Comma separated list of directories to monitor for completed jobs. > #historyserver.archive.fs.dir: hdfs:///completed-jobs/ > > # Interval in milliseconds for refreshing the monitored directories. > #historyserver.archive.fs.refresh-interval: 10000 > > akka.ask.timeout: 1000 s > akka.client.timeout: 1000 s > akka.lookup.timeout: 1000 s > > web.timeout: 1000000 > taskmanager.debug.memory.log: true > > *hdfs-site.xml:* > <?xml version="1.0" encoding="UTF-8"?> > <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> > <!-- > Licensed under the Apache License, Version 2.0 (the "License"); > you may not use this file except in compliance with the License. > You may obtain a copy of the License at > > http://www.apache.org/licenses/LICENSE-2.0 > > Unless required by applicable law or agreed to in writing, software > distributed under the License is distributed on an "AS IS" BASIS, > WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > See the License for the specific language governing permissions and > limitations under the License. See accompanying LICENSE file. > --> > > <!-- Put site-specific property overrides in this file. --> > > <configuration> > > </configuration> > > > > > > On Tue, 15 Oct 2019 at 08:38, 刘芃成 <pengchengliu_b...@163.com> wrote: > >> Maybe you can paste your flink configuration and hdfs-site.xml and check >> if there are some problems on the hdfs fileSystem related conf. Also you >> should check whether this path really exists on hdfs with a hdfs shell >> command(e.g. hdfs dfs -ls /xxx, see >> https://hadoop.apache.org/docs/r2.7.5/hadoop-project-dist/hadoop-common/FileSystemShell.html >> ) >> At 2019-10-15 01:27:39, "Pritam Sadhukhan" <sadhukhan.pri...@gmail.com> >> wrote: >> >Hi, >> > >> >I am trying to use orcsourcetable to fetch data stored in hive tables on >> >hdfs. >> >I am able to use the orcsourcetable to fetch the data and deserialize on >> >local cluster. >> > >> >But when I am trying to use the hdfs path, it is throwing me file not >> found >> >error. >> > >> >Any help will be appreciated on the topic. >> > >> >Versions: >> > >> >Flink: 1.7.1 >> >Hive: 2.3.4 >> > >> >*Code snippet:* >> > >> >import org.apache.flink.api.java.DataSet; >> >import org.apache.flink.api.java.ExecutionEnvironment; >> >import org.apache.flink.configuration.Configuration; >> >import org.apache.flink.core.fs.FileSystem; >> >import org.apache.flink.orc.OrcTableSource; >> >import org.apache.flink.table.api.java.BatchTableEnvironment; >> >import org.apache.flink.table.api.Table; >> >import org.apache.flink.table.api.TableEnvironment; >> >import org.apache.flink.types.Row; >> > >> >final ExecutionEnvironment environment = ExecutionEnvironment >> > .getExecutionEnvironment(); >> >BatchTableEnvironment tableEnvironment = >> >TableEnvironment.getTableEnvironment(environment); >> >OrcTableSource orcTS = OrcTableSource.builder() >> > .path("hdfs://host:port/logs/sa_structured_events") >> > .forOrcSchema(new >> >OrcSchemaProvider().getStructuredEventsSchema()) >> > .build(); >> > >> >tableEnvironment.registerTableSource("OrcTable", orcTS); >> >Table result = tableEnvironment.sqlQuery("SELECT * FROM OrcTable"); >> > >> >DataSet<Row> rowDataSet = tableEnvironment.toDataSet(result, Row.class); >> > >> >tableEnvironment.execEnv().execute(); >> > >> > >> >*Error:* >> >2019-10-14 16:56:26,048 INFO >> > org.apache.flink.runtime.executiongraph.ExecutionGraph - >> DataSource >> >(OrcFile[path=hdfs://host:port/logs/sa_structured_events, >> >schema=struct<customerid:string,eventid:string,subtype:st) (1/1) >> >(9e1ad40a0f0b80ef0ad8d3b2fc58816d) switched from RUNNING to FAILED. >> >java.io.FileNotFoundException: File >> >> >/logs/sa_structured_events/part-00000-b2562d39-1097-490c-99dd-672ed12bbb10-c000.snappy.orc >> >does not exist >> >at >> >> >org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:635) >> >at >> >> >org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:861) >> >at >> >> >org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:625) >> >at >> >> >org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442) >> >at >> >> >org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146) >> >at >> org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347) >> >at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787) >> >at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:517) >> >at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364) >> >at org.apache.orc.OrcFile.createReader(OrcFile.java:251) >> >at >> org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:225) >> >at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:63) >> >at >> >> >org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) >> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >> >at java.lang.Thread.run(Unknown Source) >> >2019-10-14 16:56:26,048 INFO >> > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >> Flink >> >Java Job at Mon Oct 14 16:56:07 IST 2019 >> (26a54fbcbd46cd0c4796e7308a2ba3b0) >> >switched from state RUNNING to FAILING. >> >java.io.FileNotFoundException: File >> >> >/logs/sa_structured_events/part-00000-b2562d39-1097-490c-99dd-672ed12bbb10-c000.snappy.orc >> >does not exist >> >at >> >> >org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:635) >> >at >> >> >org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:861) >> >at >> >> >org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:625) >> >at >> >> >org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442) >> >at >> >> >org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146) >> >at >> org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347) >> >at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787) >> >at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:517) >> >at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364) >> >at org.apache.orc.OrcFile.createReader(OrcFile.java:251) >> >at >> org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:225) >> >at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:63) >> >at >> >> >org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) >> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >> >at java.lang.Thread.run(Unknown Source) >> > >> > >> >Regards, >> >Pritam. >> >