Just for everyone to know we figure it out, it was an environment problem.

In our case we have our cluster in a network that is not accessible
directly, so to deploy we need to uses Jenkins  with some slaves that have
access to that network.

During deployment in the *main* method of the class we execute
*FileSystems.**setDefaultPipelineOptions(_**options);* which trigger
the *HadoopFileSystemOptionsRegistrar
*via the ServiceLoader mechanism and this access the environment
variable *HADOOP_CONF_DIR
*in order to correctly register the Filesystem.

SO, its very important that the machine you are using for deployment have
that Environment variable set as well (not only the worker where the
pipeline will run).

In our case the variable was set on the .bashrc of the user used for
deployment, but here is the catch.

We were using "sudo -u DEPLOYMENT_USER *-s* /var/lib/flink/bin/flink run -d
.........", but the flag *-s *do not execute the user .bashrc
(.bash_profile), hence we have failures at runtime. The fix was just
replacing *-s *flag with *-i *to make sure the environment variable is
present when the command to run works.

Thanks


On Fri, Oct 26, 2018 at 1:52 PM Juan Carlos Garcia <jcgarc...@gmail.com>
wrote:

> Hi Tim,
>
> I am using FileIO directly with the AvroIO.sink(...), however having
> experienced BEAM-2277 with the SparkRunner few months ago, i got the
> feeling this is something different (maybe some dependency
> mismatch/missing).
>
> Thanks
>
> On Fri, Oct 26, 2018 at 1:33 PM Tim Robertson <timrobertson...@gmail.com>
> wrote:
>
>> Hi Juan
>>
>> This sounds reminiscent of
>> https://issues.apache.org/jira/browse/BEAM-2277 which we believed fixed
>> in 2.7.0.
>> What IO are you using to write your files and can you paste a snippet of
>> your code please?
>>
>> On BEAM-2277 I posted a workaround for AvroIO (it might help you find a
>> workaround too):
>>
>>  transform.apply("Write",
>>         AvroIO.writeGenericRecords(schema)
>>             .to(FileSystems.matchNewResource(options.getTarget(),true))
>>             // BEAM-2277 workaround            
>> .withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro",
>>  true)));
>>
>>
>> Thanks
>> Tim
>>
>>
>> On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia <jcgarc...@gmail.com>
>> wrote:
>>
>>> Hi Folks,
>>>
>>> I have a strange situation while running beam 2.7.0 with the
>>> FlinkRunner, my setup consist of a HA Flink cluster (1.5.4) writing to HDFS
>>> its checkpoint. Flink is able to correctly writes its checkpoint /
>>> savepoint to HDFS without any problems.
>>>
>>> However, my pipeline has to write to HDFS as well, but fails with
>>> "Caused by: java.lang.IllegalArgumentException: No filesystem found for
>>> scheme hdfs"
>>> (stacktrace at the bottom)
>>>
>>> In the host where the pipeline is running:
>>> 1. The environment variable HADOOP_CONF_DIR is set.
>>> 2. During my pipeline construction i am explicitly calling
>>> FileSystems.setDefaultPipelineOptions(_options); to trigger the
>>> ServiceLoader to find all options registrar from the classpath
>>> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class
>>> FileSystems in my main method using reflection i am able to see that at
>>> launch time it contains:
>>>    {file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff,
>>> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
>>>
>>> Any idea what i am doing wrong with the HDFS integration?
>>>
>>> {snippet}
>>>
>>> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
>>>                             Field f =
>>> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
>>>                             f.setAccessible(true);
>>>                             AtomicReference<Map<String, FileSystem>>
>>> value = (AtomicReference<Map<String, FileSystem>>) f.get(null);
>>>
>>> System.out.println("===========================");
>>>                             System.out.println(value);
>>> {snippet}
>>>
>>> {stacktrace}
>>> Caused by: java.lang.IllegalArgumentException: No filesystem found for
>>> scheme hdfs
>>>         at
>>> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>>>         at
>>> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>>>         at
>>> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
>>>         at
>>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>>         at
>>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>>         at
>>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>>         at
>>> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
>>>         at
>>> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)
>>>
>>> {stacktrace}
>>>
>>> --
>>>
>>> JC
>>>
>>>
>>>
>>> --
>>>
>>> JC
>>>
>>>
>
> --
>
> JC
>
>

-- 

JC

Reply via email to