[ 
https://issues.apache.org/jira/browse/HADOOP-19354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924144#comment-17924144
 ] 

ASF GitHub Bot commented on HADOOP-19354:
-----------------------------------------

ahmarsuhail commented on code in PR #7214:
URL: https://github.com/apache/hadoop/pull/7214#discussion_r1943106972


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -4354,22 +4257,25 @@ public void close() throws IOException {
   protected synchronized void stopAllServices() {
     try {
       trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), 
() -> {
-        closeAutocloseables(LOG, store);
+        closeAutocloseables(LOG, getStore());

Review Comment:
   @steveloughran can you explain the lifecycle of things with the Service 
stuff?
   
   We're doing `closeAutocloseables(LOG, getStore());` here, but Store doesn't 
actually implement AutoCloseable. So I'm not sure if we need to something here 
(call serviceStop()?)
   
   Also need to figure out what changes I need to make to close our 
s3SeekableInputStreamFactory, prior to this changes, I was passing in our AAL 
factory, in this `closeAutocloseables()`, which would call 
`.s3SeekableInputStreamFactory.close()`. 
   



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java:
##########
@@ -176,25 +207,90 @@ public class S3AStoreImpl implements S3AStore {
       RateLimiting writeRateLimiter,
       AuditSpanSource<AuditSpanS3A> auditSpanSource,
       @Nullable FileSystem.Statistics fsStatistics) {
-    this.storeContextFactory = requireNonNull(storeContextFactory);
+    super("S3AStore");
+    this.auditSpanSource = requireNonNull(auditSpanSource);
     this.clientManager = requireNonNull(clientManager);
     this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
+    this.fsStatistics = fsStatistics;
     this.instrumentation = requireNonNull(instrumentation);
     this.statisticsContext = requireNonNull(statisticsContext);
+    this.storeContextFactory = requireNonNull(storeContextFactory);
     this.storageStatistics = requireNonNull(storageStatistics);
     this.readRateLimiter = requireNonNull(readRateLimiter);
     this.writeRateLimiter = requireNonNull(writeRateLimiter);
-    this.auditSpanSource = requireNonNull(auditSpanSource);
     this.storeContext = 
requireNonNull(storeContextFactory.createStoreContext());
-    this.fsStatistics = fsStatistics;
+
     this.invoker = storeContext.getInvoker();
     this.bucket = storeContext.getBucket();
     this.requestFactory = storeContext.getRequestFactory();
+    addService(clientManager);
+  }
+
+  /**
+   * Create and initialize any subsidiary services, including the input stream 
factory.
+   * @param conf configuration
+   */
+  @Override
+  protected void serviceInit(final Configuration conf) throws Exception {
+
+    // create and register the stream factory, which will
+    // then follow the service lifecycle
+    objectInputStreamFactory = factoryFromConfig(conf);
+    addService(objectInputStreamFactory);
+
+    // init all child services, including the stream factory
+    super.serviceInit(conf);
+
+    // pass down extra information to the stream factory.
+    finishStreamFactoryInit();
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    initLocalDirAllocator();
+  }
+
+  /**
+   * Return the store path capabilities.
+   * If the object stream factory is non-null, hands off the
+   * query to that factory if not handled here.
+   * @param path path to query the capability of.
+   * @param capability non-null, non-empty string to query the path for 
support.
+   * @return known capabilities
+   */
+  @Override
+  public boolean hasPathCapability(final Path path, final String capability) {
+    switch (toLowerCase(capability)) {
+    case StreamCapabilities.IOSTATISTICS:

Review Comment:
   AAL doesn't have IoStats yet, so this should probably not return true 
without checking



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import java.io.IOException;
+
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.service.Service;
+
+/**
+ * A Factory for {@link ObjectInputStream} streams.
+ * <p>
+ * This class is instantiated during initialization of
+ * {@code S3AStore}, it then follows the same service
+ * lifecycle.
+ * <p>
+ * Note for maintainers: do try and keep this mostly stable.
+ * If new parameters need to be added, expand the
+ * {@link ObjectReadParameters} class, rather than change the
+ * interface signature.
+ */
+public interface ObjectInputStreamFactory
+    extends Service, StreamCapabilities {
+
+  /**
+   * Set extra initialization parameters.
+   * This MUST ONLY be invoked between {@code init()}
+   * and {@code start()}.
+   * @param factoryBindingParameters parameters for the factory binding
+   */
+  void bind(FactoryBindingParameters factoryBindingParameters);

Review Comment:
   discussed offline: this should throw IoException as createAsyncClient() 
throws IoException 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.Preconditions;
+
+import static org.apache.hadoop.util.StringUtils.toLowerCase;
+
+/**
+ * Base implementation of {@link ObjectInputStreamFactory}.
+ */
+public abstract class AbstractObjectInputStreamFactory extends AbstractService
+    implements ObjectInputStreamFactory {
+
+  /**
+   * Parameters passed down in
+   * {@link #bind(FactoryBindingParameters)}.
+   */
+  private FactoryBindingParameters bindingParameters;
+
+  /**
+   * Callbacks.
+   */
+  private StreamFactoryCallbacks callbacks;
+
+  protected AbstractObjectInputStreamFactory(final String name) {
+    super(name);
+  }
+
+  /**
+   * Bind to the callbacks.
+   * <p>
+   * The base class checks service state then stores
+   * the callback interface.
+   * @param factoryBindingParameters parameters for the factory binding
+   */
+  @Override
+  public void bind(final FactoryBindingParameters factoryBindingParameters) {
+    // must be on be invoked during service initialization
+    Preconditions.checkState(isInState(STATE.INITED),
+        "Input Stream factory %s is in wrong state: %s",
+        this, getServiceState());
+    bindingParameters = factoryBindingParameters;
+    callbacks = bindingParameters.callbacks();
+  }
+
+  /**
+   * Return base capabilities of all stream factories,
+   * defining what the base ObjectInputStream class does.
+   * This also includes the probe for stream type capability.
+   * @param capability string to query the stream support for.
+   * @return true if implemented
+   */
+  @Override
+  public boolean hasCapability(final String capability) {
+    switch (toLowerCase(capability)) {
+    case StreamCapabilities.IOSTATISTICS:
+    case StreamStatisticNames.STREAM_LEAKS:

Review Comment:
   similar to above, think we should just let the stream factories define what 
capabilities they have rather than doing it on the parent class. As AAL does 
not have IoStats or stream leaks right now 





> S3A: InputStreams to be created by factory under S3AStore
> ---------------------------------------------------------
>
>                 Key: HADOOP-19354
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19354
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.2
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> Migrate S3AInputStream creation into a factory pattern, push down into 
> S3AStore.
> Proposed factories
> * default: whatever this release has as default
> * classic: current S3AInputStream
> * prefetch: prefetching
> * analytics: new analytics stream
> * other: reads a classname from another prop, instantiates.
> Also proposed
> * stream to implement some stream capability to declare what they are 
> (classic, prefetch, analytics, other). 
> h2. Implementation
> All callbacks used by the stream also to call directly onto S3AStore.
> S3AFileSystem must not be invoked at all (if it is needed: PR is still not 
> ready).
> Some interface from Instrumentation will be passed to factory; this shall 
> include a way to create new per-stream 
> The factory shall implement org.apache.hadoop.service.Service; S3AStore shall 
> do same and become a subclass of CompositeService. It shall attach the 
> factory as a child, so they can follow the same lifecycle. We shall do the 
> same for anything else that gets pushed down.
> Everything related to stream creation must go from s3afs; and creation of the 
> factory itself. This must be done in S3AStore.initialize(). 
> As usual, this will complicate mocking. But the streams themselves should not 
> require changes, at least significant ones.
> Testing.
> * The huge file tests should be tuned so each of the different ones uses a 
> different stream, always.
> * use a -Dstream="factory name" to choose factory, rather than the -Dprefetch
> * if not set, whatever is in auth-keys gets picked up.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to