[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-09-17 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1164072696


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierWriter.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TierType;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier writer for the tiered store. The storage tier 
writers is independent of
+ * each other. Through the {@link TierWriter}, we can create {@link 
TierStorage} to store shuffle
+ * data.
+ */
+public interface TierWriter {
+
+void setup() throws IOException;
+
+/** Create the {@link TierStorage} of the {@link TierWriter} to write 
shuffle data. */
+TierStorage createPartitionTierStorage();

Review Comment:
   Fixed the issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-09-17 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161498735


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+/**
+ * The shuffle records will be accumulated to the finished buffer before 
writing to the
+ * corresponding tier and the accumulator is the tier of {@link 
TierType#IN_CACHE}.
+ */
+enum TierType implements TieredStoreMode {
+IN_CACHE,
+IN_MEM,
+IN_DISK,
+IN_REMOTE,

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-07-12 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502620


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreProducerImpl.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulator;
+import 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulatorImpl;
+import 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier;
+import 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This is a common entrypoint of emitting records to tiered store. These 
records will be emitted to
+ * the {@link BufferAccumulator} to accumulate and transform into finished 
buffers.
+ */
+public class TieredStoreProducerImpl implements TieredStoreProducer {
+
+private final boolean isBroadcastOnly;
+
+private final int numSubpartitions;
+
+private final BufferAccumulator bufferAccumulator;
+
+public TieredStoreProducerImpl(
+StorageTier[] storageTiers,
+int numSubpartitions,
+int bufferSize,
+boolean isBroadcastOnly,
+@Nullable BufferCompressor bufferCompressor) {
+this.isBroadcastOnly = isBroadcastOnly;
+this.numSubpartitions = numSubpartitions;
+
+this.bufferAccumulator = new BufferAccumulatorImpl();

Review Comment:
   I have updated the producer constructor and made this a constructor argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-07-12 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180122783


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Client of the Tiered Storage used by the producer. */
+public class TieredStorageProducerClient {
+private final boolean isBroadcastOnly;
+
+private final int numSubpartitions;
+
+private final BufferAccumulator bufferAccumulator;
+
+private final BufferCompressor bufferCompressor;
+
+private final List tierProducerAgents;
+
+public TieredStorageProducerClient(
+int numSubpartitions,
+boolean isBroadcastOnly,
+BufferAccumulator bufferAccumulator,
+@Nullable BufferCompressor bufferCompressor,
+List tierProducerAgents) {
+this.isBroadcastOnly = isBroadcastOnly;
+this.numSubpartitions = numSubpartitions;
+this.bufferAccumulator = bufferAccumulator;
+this.bufferCompressor = bufferCompressor;
+this.tierProducerAgents = tierProducerAgents;
+
+bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers);
+}
+
+public void write(
+ByteBuffer record,
+TieredStorageSubpartitionId subpartitionId,
+Buffer.DataType dataType,
+boolean isBroadcast)
+throws IOException {
+
+if (isBroadcast && !isBroadcastOnly) {
+for (int i = 0; i < numSubpartitions; ++i) {
+bufferAccumulator.receive(record.duplicate(), subpartitionId, 
dataType);
+}
+} else {
+bufferAccumulator.receive(record, subpartitionId, dataType);
+}
+}
+
+public void close() {
+bufferAccumulator.close();
+tierProducerAgents.forEach(TierProducerAgent::close);
+}
+
+public void writeFinishedBuffers(

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-07-12 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180092886


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, 
Serializable {
+
+private static final long serialVersionUID = -948472905048472823L;
+
+/** ID represented by a byte array. */
+protected final byte[] id;
+
+/** Pre-calculated hash-code for acceleration. */
+protected final int hashCode;
+
+public TieredStorageAbstractId(byte[] id) {
+checkArgument(id != null, "Must be not null.");
+
+this.id = id;
+this.hashCode = Arrays.hashCode(id);
+}
+
+public TieredStorageAbstractId(int length) {
+checkArgument(length > 0, "Must be positive.");
+
+this.id = randomBytes(length);
+this.hashCode = Arrays.hashCode(id);
+}
+
+public byte[] getId() {

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-07-07 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502932


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is 
independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write 
shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+void setup() throws IOException;
+
+/** Create the {@link TierWriter} of the {@link StorageTier} to write 
shuffle data. */
+TierWriter createPartitionTierWriter();
+
+/** Create the {@link TierReaderView} of the {@link StorageTier} to read 
shuffle data. */
+TierReaderView createTierReaderView(
+int subpartitionId, BufferAvailabilityListener 
availabilityListener) throws IOException;
+
+/**
+ * Before start a new segment, the writer side calls the method to check 
whether the {@link
+ * StorageTier} can store the next segment.
+ */
+boolean canStoreNextSegment(int subpartitionId);
+
+/**
+ * Before reading the segment data, the consumer calls the method to check 
whether the {@link
+ * StorageTier} has the segment.
+ */
+boolean hasCurrentSegment(int subpartitionId, int segmentIndex);
+
+/** Sets the metric group. */
+void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics);
+
+void close();
+
+void release();

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-07-07 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502777


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+/**
+ * The shuffle records will be accumulated to the finished buffer before 
writing to the
+ * corresponding tier and the accumulator is the tier of {@link 
TierType#IN_CACHE}.
+ */
+enum TierType implements TieredStoreMode {
+IN_CACHE,
+IN_MEM,
+IN_DISK,
+IN_REMOTE,
+}
+
+/**
+ * Currently, only these tier combinations are supported. If the 
configured tiers is contained
+ * in the following combinations, an exception will be thrown.
+ */
+enum SupportedTierCombinations implements TieredStoreMode {
+MEMORY,
+MEMORY_DISK,
+MEMORY_DISK_REMOTE,

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-07-07 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161498829


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is 
independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write 
shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+void setup() throws IOException;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-07-07 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187058942


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TieredStorageTestUtils.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+/** The test utils for the tiered storage tests. */
+public class TieredStorageTestUtils {

Review Comment:
   Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-05-07 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187058942


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TieredStorageTestUtils.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+/** The test utils for the tiered storage tests. */
+public class TieredStorageTestUtils {

Review Comment:
   Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-05-07 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187058705


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.ResourceRegistry;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId;
+
+/**
+ * A wrapper internal shuffle master class for tiered storage. All the tiered 
storage operations
+ * with the shuffle master should be wrapped in this class.
+ */
+public class TieredInternalShuffleMaster {
+
+private final TieredStorageMasterClient tieredStorageMasterClient;
+
+private final Map> jobPartitionIds;
+
+public TieredInternalShuffleMaster(Configuration conf) {
+TieredStorageConfiguration tieredStorageConfiguration =
+createTieredStorageConfiguration(conf);
+ResourceRegistry resourceRegistry = new ResourceRegistry();
+List tierFactories =
+tieredStorageConfiguration.getTierFactories().stream()
+.map(TierFactory::createMasterAgent)
+.peek(tierMasterAgent -> 
tierMasterAgent.setup(resourceRegistry))
+.collect(Collectors.toList());
+this.tieredStorageMasterClient = new 
TieredStorageMasterClient(tierFactories);
+this.jobPartitionIds = new HashMap<>();
+}
+
+public void addPartition(JobID jobID, ResultPartitionID resultPartitionID) 
{
+jobPartitionIds.computeIfAbsent(jobID, ignore -> new 
ArrayList<>()).add(resultPartitionID);
+tieredStorageMasterClient.addPartition(convertId(resultPartitionID));
+}
+
+public void releasePartition(ResultPartitionID resultPartitionID) {
+
tieredStorageMasterClient.releasePartition(convertId(resultPartitionID));
+}
+
+public void unregisterJob(JobID jobID) {
+List resultPartitionIDs = 
jobPartitionIds.remove(jobID);
+if (resultPartitionIDs != null) {
+resultPartitionIDs.forEach(
+resultPartitionID ->
+tieredStorageMasterClient.releasePartition(
+convertId(resultPartitionID)));
+}
+}
+
+private TieredStorageConfiguration 
createTieredStorageConfiguration(Configuration conf) {
+// TODO, from the configuration, get the configured options(i.e., 
remote storage path, the
+// reserved storage size, etc.), then set them to the builder.

Review Comment:
   Replaced this with a `TieredStorageConfiguration#fromConfiguration`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-05-07 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187058204


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.ResourceRegistry;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId;
+
+/**
+ * A wrapper internal shuffle master class for tiered storage. All the tiered 
storage operations
+ * with the shuffle master should be wrapped in this class.
+ */
+public class TieredInternalShuffleMaster {
+
+private final TieredStorageMasterClient tieredStorageMasterClient;
+
+private final Map> jobPartitionIds;
+
+public TieredInternalShuffleMaster(Configuration conf) {
+TieredStorageConfiguration tieredStorageConfiguration =
+createTieredStorageConfiguration(conf);
+ResourceRegistry resourceRegistry = new ResourceRegistry();
+List tierFactories =
+tieredStorageConfiguration.getTierFactories().stream()
+.map(TierFactory::createMasterAgent)
+.peek(tierMasterAgent -> 
tierMasterAgent.setup(resourceRegistry))
+.collect(Collectors.toList());
+this.tieredStorageMasterClient = new 
TieredStorageMasterClient(tierFactories);
+this.jobPartitionIds = new HashMap<>();
+}
+
+public void addPartition(JobID jobID, ResultPartitionID resultPartitionID) 
{
+jobPartitionIds.computeIfAbsent(jobID, ignore -> new 
ArrayList<>()).add(resultPartitionID);
+tieredStorageMasterClient.addPartition(convertId(resultPartitionID));
+}
+
+public void releasePartition(ResultPartitionID resultPartitionID) {
+
tieredStorageMasterClient.releasePartition(convertId(resultPartitionID));
+}
+
+public void unregisterJob(JobID jobID) {
+List resultPartitionIDs = 
jobPartitionIds.remove(jobID);
+if (resultPartitionIDs != null) {
+resultPartitionIDs.forEach(
+resultPartitionID ->
+tieredStorageMasterClient.releasePartition(
+convertId(resultPartitionID)));
+}
+}

Review Comment:
   OK. Because we can not know the exact jobId when releasing a result 
partition, I added a `partitionJobIds` to record the jobId of the 
`resultPartitioinId` to be released.
   
   1. When adding a partition, add the partition id and job id to 
`jobPartitionIds` and `partitionJobIds` separately.
   2. When releasing a partition, remove the partition id in the 
`jobPartitionIds` and `partitionJobIds` separately.
   3. When registering a job, remove all the partition ids of the job in the 
`jobPartitionIds` and `partitionJobIds`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-05-07 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187054589


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.ResourceRegistry;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId;
+
+/**
+ * A wrapper internal shuffle master class for tiered storage. All the tiered 
storage operations
+ * with the shuffle master should be wrapped in this class.
+ */
+public class TieredInternalShuffleMaster {
+
+private final TieredStorageMasterClient tieredStorageMasterClient;
+
+private final Map> jobPartitionIds;
+
+public TieredInternalShuffleMaster(Configuration conf) {
+TieredStorageConfiguration tieredStorageConfiguration =
+createTieredStorageConfiguration(conf);
+ResourceRegistry resourceRegistry = new ResourceRegistry();
+List tierFactories =
+tieredStorageConfiguration.getTierFactories().stream()
+.map(TierFactory::createMasterAgent)
+.peek(tierMasterAgent -> 
tierMasterAgent.setup(resourceRegistry))

Review Comment:
   Fixed it. 
   Added an argument for the `TierFactory#createMasterAgent` to avoid the 
`setup` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-05-07 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187053970


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredResource.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+/** The resource (e.g., local files, remote storage files, etc.) for the 
Tiered Storage. */
+public interface TieredResource {

Review Comment:
   Renamed it.
   In addition, I also renamed the `ResourceRegistry` to 
`TieredStorageResourceRegistry`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-05-07 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187053105


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {

Review Comment:
   Fixed. I have moved the internal types and internal methods to the tail of 
the class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-05-07 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1187052528


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java:
##
@@ -23,6 +23,9 @@
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;

Review Comment:
   OK, we can remove the dependcies with the follow-up ticket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180124978


##
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java:
##
@@ -49,6 +57,10 @@ public class NettyShuffleMaster implements 
ShuffleMaster
 
 private final int networkBufferSize;
 
+private final ResourceRegistry resourceRegistry;

Review Comment:
   Added a `TieredInternalShuffleMaster` wrapper class.
   All the tiered storage operations with the shuffle master should be wrapped 
in this class.
   
   The `ResourceRegistry ` is changed as a variable in the constructor of 
`TieredInternalShuffleMaster`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180122783


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Client of the Tiered Storage used by the producer. */
+public class TieredStorageProducerClient {
+private final boolean isBroadcastOnly;
+
+private final int numSubpartitions;
+
+private final BufferAccumulator bufferAccumulator;
+
+private final BufferCompressor bufferCompressor;
+
+private final List tierProducerAgents;
+
+public TieredStorageProducerClient(
+int numSubpartitions,
+boolean isBroadcastOnly,
+BufferAccumulator bufferAccumulator,
+@Nullable BufferCompressor bufferCompressor,
+List tierProducerAgents) {
+this.isBroadcastOnly = isBroadcastOnly;
+this.numSubpartitions = numSubpartitions;
+this.bufferAccumulator = bufferAccumulator;
+this.bufferCompressor = bufferCompressor;
+this.tierProducerAgents = tierProducerAgents;
+
+bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers);
+}
+
+public void write(
+ByteBuffer record,
+TieredStorageSubpartitionId subpartitionId,
+Buffer.DataType dataType,
+boolean isBroadcast)
+throws IOException {
+
+if (isBroadcast && !isBroadcastOnly) {
+for (int i = 0; i < numSubpartitions; ++i) {
+bufferAccumulator.receive(record.duplicate(), subpartitionId, 
dataType);
+}
+} else {
+bufferAccumulator.receive(record, subpartitionId, dataType);
+}
+}
+
+public void close() {
+bufferAccumulator.close();
+tierProducerAgents.forEach(TierProducerAgent::close);
+}
+
+public void writeFinishedBuffers(

Review Comment:
   Renemed it.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Client of the Tie

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180122535


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/** Client of the Tiered Storage used by the producer. */
+public class TieredStorageProducerClient {
+private final boolean isBroadcastOnly;
+
+private final int numSubpartitions;
+
+private final BufferAccumulator bufferAccumulator;
+
+private final BufferCompressor bufferCompressor;
+
+private final List tierProducerAgents;
+
+public TieredStorageProducerClient(
+int numSubpartitions,
+boolean isBroadcastOnly,
+BufferAccumulator bufferAccumulator,
+@Nullable BufferCompressor bufferCompressor,
+List tierProducerAgents) {
+this.isBroadcastOnly = isBroadcastOnly;
+this.numSubpartitions = numSubpartitions;
+this.bufferAccumulator = bufferAccumulator;
+this.bufferCompressor = bufferCompressor;
+this.tierProducerAgents = tierProducerAgents;
+
+bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers);
+}
+
+public void write(
+ByteBuffer record,
+TieredStorageSubpartitionId subpartitionId,
+Buffer.DataType dataType,
+boolean isBroadcast)
+throws IOException {
+
+if (isBroadcast && !isBroadcastOnly) {

Review Comment:
   Added a doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180122114


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+private final List tiers;
+
+private final Map> 
jobPartitionIds;
+
+private final ResourceRegistry resourceRegistry;
+
+public TieredStorageMasterClient(
+List tiers, ResourceRegistry resourceRegistry) {
+this.tiers = tiers;
+this.resourceRegistry = resourceRegistry;
+this.jobPartitionIds = new HashMap<>();
+}
+
+public void registerResource(TieredStorageJobId jobId, 
TieredStoragePartitionId partitionId) {
+jobPartitionIds.computeIfAbsent(jobId, ignore -> new 
ArrayList<>()).add(partitionId);
+resourceRegistry.registerResource(
+partitionId, () -> tiers.forEach(TierMasterAgent::release));

Review Comment:
   These methods have been renamed into `addPartition` and `releasePartition`.
   
   Each tier itself will decide what to do, i.e., register or release 
resources. (These codes are not added now, because they will be added when 
implementing each tier.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180118466


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+private final List tiers;
+
+private final Map> 
jobPartitionIds;

Review Comment:
   OK, good point. Added a `TieredInternalShuffleMaster` wrapper class.  
   All the tiered storage operations with the shuffle master should be wrapped 
in this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180116664


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Client of the Tiered Storage used by the master. */
+public class TieredStorageMasterClient {
+
+private final List tiers;
+
+private final Map> 
jobPartitionIds;
+
+private final ResourceRegistry resourceRegistry;
+
+public TieredStorageMasterClient(
+List tiers, ResourceRegistry resourceRegistry) {
+this.tiers = tiers;
+this.resourceRegistry = resourceRegistry;
+this.jobPartitionIds = new HashMap<>();
+}
+
+public void registerResource(TieredStorageJobId jobId, 
TieredStoragePartitionId partitionId) {

Review Comment:
   This need not be a topic id, because the result partition id is unique. 
   
   The reason why the result partition id is unique is that there is 
`IntermediateResultPartitionID` in its own field and this 
`IntermediateResultPartitionID` has included the `IntermediateDataSetID` 
information naturally.
   
   In addition, these methods have been renamed into `addPartition` and 
`releasePartition`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180110166


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##
@@ -29,6 +34,18 @@ public class TieredStorageUtils {
 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 
'E', 'F'
 };
 
+public static List getTierFactoriesFromConfiguration() {
+TieredStorageConfiguration tieredStorageConfiguration =
+TieredStorageConfiguration.builder()
+
.setTierTypes(TieredStorageConfiguration.memoryDiskTierTypes())
+.build();

Review Comment:
   Removed the `getTieredFactories` in `TieredStorageUtis`. In addition, the 
`TieredStorageUtis` is useless now, so I also removed it. Now we can get the 
factories with `TieredStorageConfiguration#getTierFactories`.
   
   Removed the `setTierTypes`, because in the first version, we need not set 
the tier types from outside of the configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180104030


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+private enum TierType {
+IN_MEM,
+IN_DISK,
+IN_REMOTE,
+}
+
+private static final TierType[] DEFAULT_MEMORY_DISK_TIER_TYPES =
+new TierType[] {TierType.IN_MEM, TierType.IN_DISK};
+
+private final TierType[] tierTypes;
+
+private TieredStorageConfiguration(TierType[] tierTypes) {
+this.tierTypes = tierTypes;
+}
+
+public int[] getTierIndexes() {
+int[] tierIndexes = new int[tierTypes.length];
+for (int i = 0; i < tierTypes.length; i++) {
+tierIndexes[i] = i;
+}
+return tierIndexes;
+}
+
+public static TierType[] memoryDiskTierTypes() {

Review Comment:
   Removed it.
   In the first version, only the default tier type is supported, so I removed 
`memoryDiskTierTypes `.
   
   Currently, the default value is a fixed value. So I changed 
`TieredStorageConfiguration#getDefaultTierTypes` into a private method, and get 
the tierTypes when building the configuration.
   
   There is a TODO here, the `getDefaultTierTypes` should add a new argument in 
the future. And this method will be like this after adding the option of  
remote storage path.
   ```
   private static TierType[] getDefaultTierTypes(String 
remoteStorageHomePath) {
   return remoteStorageHomePath == null
   ? DEFAULT_MEMORY_DISK_TIER_TYPES
   : DEFAULT_MEMORY_DISK_REMOTE_TIER_TYPES;
   }
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180097369


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtils.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.AbstractID;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+
+/** Utils to convert the Ids to Tiered Storage Ids, or vice versa. */
+public class TieredStorageIdMappingUtils {
+
+public static TieredStorageJobId convertId(JobID jobID) {
+return new TieredStorageJobId(jobID.getBytes());
+}
+
+public static JobID convertId(TieredStorageJobId tieredStorageJobId) {
+return new JobID(tieredStorageJobId.getId());
+}
+
+public static TieredStorageTopicId convertId(IntermediateDataSetID 
intermediateDataSetID) {
+return new TieredStorageTopicId(intermediateDataSetID.getBytes());
+}
+
+public static IntermediateDataSetID convertId(TieredStorageTopicId 
topicId) {
+return new IntermediateDataSetID(new AbstractID(topicId.getId()));
+}
+
+public static TieredStoragePartitionId convertId(ResultPartitionID 
resultPartitionId) {
+ByteBuf byteBuf = Unpooled.buffer();

Review Comment:
   Resolved it. 
   Add a `getBytes` method in the `ResultPartitionId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180096713


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+private enum TierType {
+IN_MEM,
+IN_DISK,
+IN_REMOTE,
+}
+
+private static final TierType[] DEFAULT_MEMORY_DISK_TIER_TYPES =
+new TierType[] {TierType.IN_MEM, TierType.IN_DISK};
+
+private final TierType[] tierTypes;
+
+private TieredStorageConfiguration(TierType[] tierTypes) {
+this.tierTypes = tierTypes;
+}
+
+public int[] getTierIndexes() {
+int[] tierIndexes = new int[tierTypes.length];
+for (int i = 0; i < tierTypes.length; i++) {
+tierIndexes[i] = i;
+}
+return tierIndexes;
+}

Review Comment:
   Removed it. 
   Because we can use a `TieredStorageConfiguration#getTierFactories` to get 
the factories. So the method is useless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180094931


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageJobId.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+
+/**
+ * Identifier of a job.
+ *
+ * A job is equivalent to a job in Flink.
+ */
+public class TieredStorageJobId extends TieredStorageAbstractId {

Review Comment:
   Removed it. Now I use a partition id directly to release the resource 
because the partition id is unique.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Configurations for the Tiered Storage. */
+public class TieredStorageConfiguration {
+
+private enum TierType {
+IN_MEM,
+IN_DISK,
+IN_REMOTE,
+}

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180093470


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import java.util.Random;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utils for reading or writing to tiered store. */
+public class TieredStorageUtils {
+
+private static final char[] HEX_CHARS = {
+'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 
'E', 'F'
+};
+
+public static byte[] randomBytes(int length) {

Review Comment:
   Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180092886


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, 
Serializable {
+
+private static final long serialVersionUID = -948472905048472823L;
+
+/** ID represented by a byte array. */
+protected final byte[] id;
+
+/** Pre-calculated hash-code for acceleration. */
+protected final int hashCode;
+
+public TieredStorageAbstractId(byte[] id) {
+checkArgument(id != null, "Must be not null.");
+
+this.id = id;
+this.hashCode = Arrays.hashCode(id);
+}
+
+public TieredStorageAbstractId(int length) {
+checkArgument(length > 0, "Must be positive.");
+
+this.id = randomBytes(length);
+this.hashCode = Arrays.hashCode(id);
+}
+
+public byte[] getId() {

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180092466


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import java.util.Random;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utils for reading or writing to tiered store. */
+public class TieredStorageUtils {
+
+private static final String TIER_STORE_DIR = "tiered-store";
+
+private static final char[] HEX_CHARS = {
+'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 
'E', 'F'
+};
+
+public static byte[] randomBytes(int length) {
+checkArgument(length > 0, "Must be positive.");
+
+Random random = new Random();
+byte[] bytes = new byte[length];
+random.nextBytes(bytes);
+return bytes;
+}
+
+public static String bytesToHexString(byte[] bytes) {

Review Comment:
   Removed, use `StringUtils#byteToHexString` instead.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, 
Serializable {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-28 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1180092087


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The abstract unique identification for the Tiered Storage. */
+public class TieredStorageAbstractId implements TieredStorageDataIdentifier, 
Serializable {
+
+private static final long serialVersionUID = -948472905048472823L;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-12 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1164073848


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * This {@link TierStorage} is the specific tier storage. The finished {@link 
Buffer} will be
+ * emitted to this {@link TierStorage}. Each tier may contain data of all 
subpartitions.
+ */
+public interface TierStorage {
+
+/** Setups the {@link TierStorage}. */
+void setup() throws IOException;
+
+/** Emits the finished {@link Buffer} to a specific subpartition. */
+boolean emit(
+int targetSubpartition, Buffer finishedBuffer, boolean 
isEndOfPartition, int segmentId)
+throws IOException;
+
+/** Closes the {@link TierStorage}, the opened channels should be closed. 
*/
+void close();
+
+/** Releases the {@link TierStorage}, all resources should be released. */
+void release();

Review Comment:
   I have updated it. `TierStorageWriter` only has `close` method. 
`TierStorage` only has `release` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-12 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1164072696


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierWriter.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TierType;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier writer for the tiered store. The storage tier 
writers is independent of
+ * each other. Through the {@link TierWriter}, we can create {@link 
TierStorage} to store shuffle
+ * data.
+ */
+public interface TierWriter {
+
+void setup() throws IOException;
+
+/** Create the {@link TierStorage} of the {@link TierWriter} to write 
shuffle data. */
+TierStorage createPartitionTierStorage();

Review Comment:
   Fixed the issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-12 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1164072367


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * This {@link TierStorage} is the specific tier storage. The finished {@link 
Buffer} will be
+ * emitted to this {@link TierStorage}. Each tier may contain data of all 
subpartitions.
+ */
+public interface TierStorage {
+
+/** Setups the {@link TierStorage}. */
+void setup() throws IOException;
+
+/** Emits the finished {@link Buffer} to a specific subpartition. */
+boolean emit(
+int targetSubpartition, Buffer finishedBuffer, boolean 
isEndOfPartition, int segmentId)

Review Comment:
   It has been renamed to `write` in `TierStorageWriter`. 
   Only two arguments `consumerId` and `finishedBuffer` are kept. 
   `isEndOfParition` and `segmentId ` are removed in `write` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-12 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1164069146


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TierType.java:
##
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+/** The storage types for tiered store. */
+public enum TierType {

Review Comment:
   I have made the TierType as a private enum in the 
`TieredStoreShuffleEnvironment`. 
   The enum does not indicate that only these 3 types are supported, only for 
easily adding new tiers if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161504837


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is 
independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write 
shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+void setup() throws IOException;
+
+/** Create the {@link TierWriter} of the {@link StorageTier} to write 
shuffle data. */
+TierWriter createPartitionTierWriter();
+
+/** Create the {@link TierReaderView} of the {@link StorageTier} to read 
shuffle data. */
+TierReaderView createTierReaderView(
+int subpartitionId, BufferAvailabilityListener 
availabilityListener) throws IOException;
+
+/**
+ * Before start a new segment, the writer side calls the method to check 
whether the {@link
+ * StorageTier} can store the next segment.
+ */
+boolean canStoreNextSegment(int subpartitionId);

Review Comment:
   The method has been removed from the API. It will be in the netty-based 
consumer API in the subsequential change. 
   
   In that change, we will use the `producerId`  instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161500134


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is 
independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write 
shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+void setup() throws IOException;
+
+/** Create the {@link TierWriter} of the {@link StorageTier} to write 
shuffle data. */
+TierWriter createPartitionTierWriter();
+
+/** Create the {@link TierReaderView} of the {@link StorageTier} to read 
shuffle data. */
+TierReaderView createTierReaderView(
+int subpartitionId, BufferAvailabilityListener 
availabilityListener) throws IOException;
+
+/**
+ * Before start a new segment, the writer side calls the method to check 
whether the {@link
+ * StorageTier} can store the next segment.
+ */
+boolean canStoreNextSegment(int subpartitionId);
+
+/**
+ * Before reading the segment data, the consumer calls the method to check 
whether the {@link
+ * StorageTier} has the segment.
+ */
+boolean hasCurrentSegment(int subpartitionId, int segmentIndex);
+
+/** Sets the metric group. */
+void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics);

Review Comment:
   Fixed. This can be in the `BufferAccumulator`. It need not be in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502932


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is 
independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write 
shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+void setup() throws IOException;
+
+/** Create the {@link TierWriter} of the {@link StorageTier} to write 
shuffle data. */
+TierWriter createPartitionTierWriter();
+
+/** Create the {@link TierReaderView} of the {@link StorageTier} to read 
shuffle data. */
+TierReaderView createTierReaderView(
+int subpartitionId, BufferAvailabilityListener 
availabilityListener) throws IOException;
+
+/**
+ * Before start a new segment, the writer side calls the method to check 
whether the {@link
+ * StorageTier} can store the next segment.
+ */
+boolean canStoreNextSegment(int subpartitionId);
+
+/**
+ * Before reading the segment data, the consumer calls the method to check 
whether the {@link
+ * StorageTier} has the segment.
+ */
+boolean hasCurrentSegment(int subpartitionId, int segmentIndex);
+
+/** Sets the metric group. */
+void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics);
+
+void close();
+
+void release();

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502777


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+/**
+ * The shuffle records will be accumulated to the finished buffer before 
writing to the
+ * corresponding tier and the accumulator is the tier of {@link 
TierType#IN_CACHE}.
+ */
+enum TierType implements TieredStoreMode {
+IN_CACHE,
+IN_MEM,
+IN_DISK,
+IN_REMOTE,
+}
+
+/**
+ * Currently, only these tier combinations are supported. If the 
configured tiers is contained
+ * in the following combinations, an exception will be thrown.
+ */
+enum SupportedTierCombinations implements TieredStoreMode {
+MEMORY,
+MEMORY_DISK,
+MEMORY_DISK_REMOTE,

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502620


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreProducerImpl.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulator;
+import 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulatorImpl;
+import 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier;
+import 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This is a common entrypoint of emitting records to tiered store. These 
records will be emitted to
+ * the {@link BufferAccumulator} to accumulate and transform into finished 
buffers.
+ */
+public class TieredStoreProducerImpl implements TieredStoreProducer {
+
+private final boolean isBroadcastOnly;
+
+private final int numSubpartitions;
+
+private final BufferAccumulator bufferAccumulator;
+
+public TieredStoreProducerImpl(
+StorageTier[] storageTiers,
+int numSubpartitions,
+int bufferSize,
+boolean isBroadcastOnly,
+@Nullable BufferCompressor bufferCompressor) {
+this.isBroadcastOnly = isBroadcastOnly;
+this.numSubpartitions = numSubpartitions;
+
+this.bufferAccumulator = new BufferAccumulatorImpl();

Review Comment:
   I have updated the producer constructor and made this a constructor argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161502026


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreResultPartition.java:
##
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier;
+import 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link TieredStoreResultPartition} appends records and events to the tiered 
store, which supports
+ * the upstream dynamically switches storage tier for writing shuffle data, 
and the downstream will
+ * read data from the relevant storage tier.
+ */
+public class TieredStoreResultPartition extends ResultPartition implements 
ChannelStateHolder {

Review Comment:
   This is a typo, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161501622


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreConfiguration.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The configuration for tiered store. */
+public class TieredStoreConfiguration {
+
+private final String tieredStoreTiers;

Review Comment:
   This change should be contained in the PR. I have removed it. I will use an 
explicit type when submitting the new PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161500942


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/MemorySegmentAndChannel.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+/** {@link MemorySegment} info and the corresponding channel index. */
+public class MemorySegmentAndChannel {

Review Comment:
   This is a class used in `BufferAccumulator`. The finished memory 
segment(without buffer recycler) should be emitted to each tier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161500038


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is 
independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write 
shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+void setup() throws IOException;
+
+/** Create the {@link TierWriter} of the {@link StorageTier} to write 
shuffle data. */
+TierWriter createPartitionTierWriter();
+
+/** Create the {@link TierReaderView} of the {@link StorageTier} to read 
shuffle data. */
+TierReaderView createTierReaderView(
+int subpartitionId, BufferAvailabilityListener 
availabilityListener) throws IOException;
+
+/**
+ * Before start a new segment, the writer side calls the method to check 
whether the {@link
+ * StorageTier} can store the next segment.
+ */
+boolean canStoreNextSegment(int subpartitionId);
+
+/**
+ * Before reading the segment data, the consumer calls the method to check 
whether the {@link
+ * StorageTier} has the segment.
+ */
+boolean hasCurrentSegment(int subpartitionId, int segmentIndex);

Review Comment:
   The method is removed from the API. The method will be in netty-based 
upstream consumer API in the subsequential PR.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is 
independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write 
shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+void setup() throws IOException;
+
+/** Create the {@link TierWriter} of the {@link StorageTier} to write 
shuffle data. */
+TierWriter createPartitionTierWriter();
+
+/** Create the {@link TierReaderView} of the {@link StorageTier} to read 
shuffle data. */
+TierReaderView createTierReaderView(
+int subpartitionId, BufferAvailabilityListener 
availabilityListener) throws IOException;
+
+/**
+ * Before start a new segment, the writer side calls the method to check 
whether the {@link
+ * StorageTier} can store the next segment.
+ */
+boolean canStoreNextSegment(int subpartitionId);
+
+/**
+ * Before reading the segment data, the consumer calls the method to check 
whether the {@link
+ * StorageTier} has the segment.
+ */
+boolean hasCurrentSegment(int subpartitionId, int segmentIndex);
+
+/** Sets th

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161499623


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is 
independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write 
shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+void setup() throws IOException;
+
+/** Create the {@link TierWriter} of the {@link StorageTier} to write 
shuffle data. */
+TierWriter createPartitionTierWriter();
+
+/** Create the {@link TierReaderView} of the {@link StorageTier} to read 
shuffle data. */
+TierReaderView createTierReaderView(
+int subpartitionId, BufferAvailabilityListener 
availabilityListener) throws IOException;

Review Comment:
   Ok, it is removed from the API. The method will be in netty-based upstream 
consumer API in the subsequential PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161498735


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+/**
+ * The shuffle records will be accumulated to the finished buffer before 
writing to the
+ * corresponding tier and the accumulator is the tier of {@link 
TierType#IN_CACHE}.
+ */
+enum TierType implements TieredStoreMode {
+IN_CACHE,
+IN_MEM,
+IN_DISK,
+IN_REMOTE,

Review Comment:
   Fixed.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+
+import java.io.IOException;
+
+/**
+ * The single storage tier for the tiered store. The storage tiers is 
independent of each other.
+ * Through the {@link StorageTier}, we can create {@link TierWriter} to write 
shuffle data and
+ * create the {@link TierReaderView} to consume shuffle data.
+ */
+public interface StorageTier {
+
+void setup() throws IOException;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161498489


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+/**
+ * The shuffle records will be accumulated to the finished buffer before 
writing to the
+ * corresponding tier and the accumulator is the tier of {@link 
TierType#IN_CACHE}.
+ */
+enum TierType implements TieredStoreMode {
+IN_CACHE,
+IN_MEM,
+IN_DISK,
+IN_REMOTE,
+}
+
+/**
+ * Currently, only these tier combinations are supported. If the 
configured tiers is contained
+ * in the following combinations, an exception will be thrown.
+ */
+enum SupportedTierCombinations implements TieredStoreMode {
+MEMORY,
+MEMORY_DISK,
+MEMORY_DISK_REMOTE,
+}

Review Comment:
   `SupportedTierCombinations ` is removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161498297


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {
+
+/**
+ * The shuffle records will be accumulated to the finished buffer before 
writing to the
+ * corresponding tier and the accumulator is the tier of {@link 
TierType#IN_CACHE}.

Review Comment:
   `IN_CACHE` is removed from `TierType`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161498092


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;
+
+/** The tiered store mode for {@link TieredStoreResultPartition}. */
+public interface TieredStoreMode {

Review Comment:
   `TieredStoreMode ` is removed, we use `TierType` in 
`o.a.f.r.io.network.partition.tieredstore`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-04-10 Thread via GitHub


TanYuxin-tyx commented on code in PR #22330:
URL: https://github.com/apache/flink/pull/22330#discussion_r1161497423


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.tieredstore.upstream;

Review Comment:
   I have moved these classes to `o.a.f.r.i.n.partition.hybrid.tiered`.
   The `TieredStoreMode` is removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org