[GitHub] [flink] Thesharing commented on a diff in pull request #22674: [FLINK-32201][runtime]Automatically determine if the shuffle descriptor needs to be offloaded by the blob server based on the nu

2023-05-29 Thread via GitHub


Thesharing commented on code in PR #22674:
URL: https://github.com/apache/flink/pull/22674#discussion_r1209723546


##
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##
@@ -104,4 +122,14 @@ public void 
markPartitionFinished(IntermediateResultPartition resultPartition) {
 checkNotNull(
 
resultPartitionIdToIndex.get(resultPartition.getPartitionId();
 }
+
+private boolean isForceOffload(ShuffleDescriptorAndIndex[] 
shuffleDescriptorsToSerialize) {
+// The unknown shuffle descriptor will be compressed, so don't take it 
into account.
+long numKnownShuffleDescriptors =

Review Comment:
   Could we add some tests for the new logics, particularly for this part?



##
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##
@@ -104,4 +122,14 @@ public void 
markPartitionFinished(IntermediateResultPartition resultPartition) {
 checkNotNull(
 
resultPartitionIdToIndex.get(resultPartition.getPartitionId();
 }
+
+private boolean isForceOffload(ShuffleDescriptorAndIndex[] 
shuffleDescriptorsToSerialize) {

Review Comment:
   How about `shouldOffload` instead of `isForceOffload`? I think we're not 
forced to do this optimization.藍 



##
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##
@@ -38,6 +39,14 @@
 
 /** {@link ShuffleDescriptor}s cache for a {@link ConsumedPartitionGroup}. */
 public class CachedShuffleDescriptors {
+/**
+ * The threshold to force enable offload shuffle descriptors via blob 
server. This is a fixed
+ * value since it is difficult for users to configure. This default value 
means JobManager need
+ * to serialized and transport 1 shuffle descriptors(almost 200KB) to 
1 consumer(2GB in
+ * total)
+ */
+private static final int FORCE_OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD = 
1 * 1;

Review Comment:
   How about making this option configurable but not exposed to the user manual?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22501: [FLINK-31637][network] Implement the BufferAccumulator for the tiered storage

2023-05-29 Thread via GitHub


reswqa commented on code in PR #22501:
URL: https://github.com/apache/flink/pull/22501#discussion_r1209718857


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulator.java:
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The hash implementation of the {@link BufferAccumulator}. The {@link 
BufferAccumulator} receives
+ * the records from {@link TieredStorageProducerClient} and the records will 
accumulate and
+ * transform to finished buffers. The accumulated buffers will be transferred 
to the corresponding
+ * tier dynamically.
+ *
+ * To avoid the buffer waiting deadlock between the subpartitions, the 
{@link
+ * HashBufferAccumulator} requires at least n buffers (n is the parallelism) 
to make sure that each

Review Comment:
   > n is the parallelism
   
   It's a bit strange to use `parallelism` here. I wonder whose parallelism it 
is, upstream or downstream? Can we change it to `number of subpartitions`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32215) Date format in flink-statefun startupPosition documentation is incorrect

2023-05-29 Thread Leijurv (Jira)
Leijurv created FLINK-32215:
---

 Summary: Date format in flink-statefun startupPosition 
documentation is incorrect
 Key: FLINK-32215
 URL: https://issues.apache.org/jira/browse/FLINK-32215
 Project: Flink
  Issue Type: Bug
Reporter: Leijurv


The example string provided in [the 
documentation|https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/modules/io/apache-kafka/]
 does not actually parse. It causes an exception: `Unable to parse date string 
for startup position: 2020-02-01 04:15:00.00 Z; the date should conform to the 
pattern -MM-dd HH:mm:ss.SSS Z`

[https://github.com/apache/flink-statefun/pull/329]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] FangYongs commented on pull request #22671: [FLINK-32211][sql-client] Supports row format in executor

2023-05-29 Thread via GitHub


FangYongs commented on PR #22671:
URL: https://github.com/apache/flink/pull/22671#issuecomment-1567720336

   Hi @libenchao Please help to review this PR when you are free, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on pull request #22501: [FLINK-31637][network] Implement the BufferAccumulator for the tiered storage

2023-05-29 Thread via GitHub


TanYuxin-tyx commented on PR #22501:
URL: https://github.com/apache/flink/pull/22501#issuecomment-1567713695

   @xintongsong Thanks a lot for the quick review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32214) Fetch more info in the FlinkDeployment status by using the overview API

2023-05-29 Thread Xin Hao (Jira)
Xin Hao created FLINK-32214:
---

 Summary: Fetch more info in the FlinkDeployment status by using 
the overview API
 Key: FLINK-32214
 URL: https://issues.apache.org/jira/browse/FLINK-32214
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Xin Hao


Currently, we are using `/config` API to fetch the FlinkDeployment's basic info.
{code:java}
{
    "refresh-interval": 3000,
    "timezone-name": "Coordinated Universal Time",
    "timezone-offset": 0,
    "flink-version": "1.15.3",
    "flink-revision": "c41c8e5 @ 2022-11-10T10:39:02+01:00",
    "features": {
        "web-submit": true,
        "web-cancel": false
    }
} {code}
Can we switch to using `/overview` to obtain more helpful info?
{code:java}
{
    "taskmanagers": 27,
    "slots-total": 27,
    "slots-available": 0,
    "jobs-running": 27,
    "jobs-finished": 0,
    "jobs-cancelled": 2,
    "jobs-failed": 0,
    "flink-version": "1.15.3",
    "flink-commit": "c41c8e5"
} {code}
The most useful one for me is `jobs-running`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31640) Write the accumulated buffers to the right storage tier

2023-05-29 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan updated FLINK-31640:
--
Summary: Write the accumulated buffers to the right storage tier  (was: 
Introduce segment index tracker)

> Write the accumulated buffers to the right storage tier
> ---
>
> Key: FLINK-31640
> URL: https://issues.apache.org/jira/browse/FLINK-31640
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209661397


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/ps/message/MessageUtils.java:
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.ps.message;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.util.Bits;
+
+/** Utility functions for processing messages. */
+public class MessageUtils {
+
+/** Retrieves the message type from the byte array. */
+public static MessageType getMessageType(byte[] bytesData) {
+char type = Bits.getChar(bytesData, 0);
+return MessageType.valueOf(type);
+}
+
+/** Reads a long array from the byte array starting from the given offset. 
*/
+public static long[] readLongArray(byte[] bytesData, int offset) {
+int size = Bits.getInt(bytesData, offset);
+offset += Integer.BYTES;
+long[] result = new long[size];
+for (int i = 0; i < size; i++) {
+result[i] = Bits.getLong(bytesData, offset);
+offset += Long.BYTES;
+}
+return result;
+}
+
+/**
+ * Writes a long array to the byte array starting from the given offset.
+ *
+ * @return the next position to write on.
+ */
+public static int writeLongArray(long[] array, byte[] bytesData, int 
offset) {
+Bits.putInt(bytesData, offset, array.length);
+offset += Integer.BYTES;
+for (int i = 0; i < array.length; i++) {
+Bits.putLong(bytesData, offset, array[i]);
+offset += Long.BYTES;
+}
+return offset;
+}
+
+/** Returns the size of a long array in bytes. */
+public static int getLongArraySizeInBytes(long[] array) {
+return Integer.BYTES + array.length * Long.BYTES;
+}
+
+/** Reads a double array from the byte array starting from the given 
offset. */
+public static double[] readDoubleArray(byte[] bytesData, int offset) {
+int size = Bits.getInt(bytesData, offset);
+offset += Integer.BYTES;
+double[] result = new double[size];
+for (int i = 0; i < size; i++) {
+result[i] = Bits.getDouble(bytesData, offset);
+offset += Long.BYTES;
+}
+return result;
+}
+
+/**
+ * Writes a double array to the byte array starting from the given offset.
+ *
+ * @return the next position to write on.
+ */
+public static int writeDoubleArray(double[] array, byte[] bytesData, int 
offset) {

Review Comment:
   `Bits.java` is copy-paste from `java.io.Bits` without any modifications. 
Moreover, `writeDoubleArray` is only used is the ps-infra, so shall we keep it 
here for now?
   
   I have updated the name following the convention of `Bits.java` and rename 
the functions as `putXX` and `getXX`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209650931


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/ps/message/MessageType.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.ps.message;
+
+/** Message Type between workers and servers. */
+public enum MessageType {
+ZEROS_TO_PUSH((char) 0),

Review Comment:
   Thanks for pointing this out. I have renamed them as 
`INITIALIZE_MODEL_AS_ZERO, PULL_INDEX, PULLED_VALUE and PUSH_KV` and also added 
java doc for these enums.
   
   Note that `PULLED_VALUE` adds the `~ed` suffix because it is sent from 
servers to workers, different from other message types. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22675: [FLINK-32211][core] Add get off heap buffer in segment

2023-05-29 Thread via GitHub


flinkbot commented on PR #22675:
URL: https://github.com/apache/flink/pull/22675#issuecomment-1567669992

   
   ## CI report:
   
   * 8856bd988ad35e6a947f2be5659dbbc993c1cbe2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26541) SQL Client should support submitting SQL jobs in application mode

2023-05-29 Thread Paul Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727239#comment-17727239
 ] 

Paul Lin commented on FLINK-26541:
--

I've created 
[FLIP-316|https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Introduce+SQL+Driver],
 please join the [discussion 
thread|https://lists.apache.org/thread/7zyojksml5xj77fdokcpttf550qflyqm] if 
you're interested.

> SQL Client should support submitting SQL jobs in application mode
> -
>
> Key: FLINK-26541
> URL: https://issues.apache.org/jira/browse/FLINK-26541
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Deployment / YARN, Table SQL / 
> Client
>Reporter: Jark Wu
>Priority: Major
>
> Currently, the SQL Client only supports submitting jobs in session mode and 
> per-job mode. As the community going to drop the per-job mode (FLINK-26000), 
> SQL Client should support application mode as well. Otherwise, SQL Client can 
> only submit SQL in session mode then, but streaming jobs should be submitted 
> in per-job or application mode to have bettter resource isolation.
> Disucssions: https://lists.apache.org/thread/2yq351nb721x23rz1q8qlyf2tqrk147r



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] FangYongs opened a new pull request, #22675: [FLINK-32211][core] Add get off heap buffer in segment

2023-05-29 Thread via GitHub


FangYongs opened a new pull request, #22675:
URL: https://github.com/apache/flink/pull/22675

   ## What is the purpose of the change
   
   This PR aims to add `getOffHeapBuffer` in `MemorySegment` and iceberg/paimon 
sink operator can create writer buffer with managed memory in Flink
   
   
   ## Brief change log
 - Add `getOffHeapBuffer` in `MemorySegment`
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
 - The serializers: (yes / no / don't know) no
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor

2023-05-29 Thread via GitHub


Fanoid commented on code in PR #210:
URL: https://github.com/apache/flink-ml/pull/210#discussion_r1206315147


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedobjects/SharedObjectsStreamOperator.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.sharedobjects;
+
+/** Interface for all operators that need to access the shared objects. */
+public interface SharedObjectsStreamOperator {

Review Comment:
   #237 proposes a brilliant solution about abstracting iterative computations 
inspired by Parameter Server. I believe both solutions can work well in many 
algorithms/scenarios.
   
   Before comparing two solutions, there are two facts about PS infra I must 
emphasize:
   
   One fact is the PS infra is built on the DataStream APIs, which means there 
will be no performance improvement compared to implement with raw DataStream 
APIs. So we mainly discuss its usability with aspect to developers.
   
   The other fact is the current status of functionalities shown in #237 cannot 
fully meet the requirements of GBDT implementation.  `MessageType`, model 
format, reduce logic of messages, etc. are all fixed/hard-coded with respect to 
gradient-based algorithms. The usability will drop significantly if forcing 
GBDT implementation to use current APIs.
   
   Therefore, to make a reasonable comparison between two solutions, I assume 
an extended version of current PS infra which supports POJO message types and 
POJO model data, user-defined `reduce` function, etc. Here are my thoughts 
under this assumption:
   
   1. Framework Intrusiveness
   
   Using PS infra means developers cannot use DataStream APIs in iterations 
anymore. Then, there are cases where PS infra cannot implement: 
 - side outputs: evaluation result streams; prediction and model streams in 
online cases.
 - partition/join/coGroup of training data sets: AUC calculation after 
model update, ALS, SimRank.
   
   As for SharedObjects, it is an augment to DataStream APIs. There is no extra 
limitation to developers.
   
   The intrusiveness also influences the observation of operators when job 
running as, in PS infra, multiple computations are merged in to one operator, 
like in/out stats, checkpoint status. This decreases usability to both 
developers and end-users.
   
   2. Applicable scenarios
   
   Besides inapplicable cases mentioned above, PS infra cannot work in 
non-iteration cases. But SharedObjects can work. One possible case is to 
improve consecutive joins with a same dataset by reducing a copy of dataset.
   
   3. Learning curve
   
   PS infra provides a whole set of concepts and interfaces such as Message, 
ModelUpdater, ProcessStage, TrainingUtils, etc., which are not related to the 
existing DataStream API and have a steeper learning curve.
   
   SharedObjects provides two interfaces, SharedObjectsUtils and 
SharedObjectsContext, and can be developed directly based on the existing 
DataStream API code, making it easier for developers to accept.
   
   
   Overall speaking, I think both solutions can coexist because they are on 
different levels of APIs and have no conflicts. How about you? @zhipeng93 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32213) Add get off heap buffer in memory segment

2023-05-29 Thread Fang Yong (Jira)
Fang Yong created FLINK-32213:
-

 Summary: Add get off heap buffer in memory segment
 Key: FLINK-32213
 URL: https://issues.apache.org/jira/browse/FLINK-32213
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.18.0
Reporter: Fang Yong


When flink job writes data to data lake such as paimon, iceberg and hudi, the 
sink will write data to writer buffer first, then flush the data to file 
system. To manage the writer buffer better, we'd like to allocate segment from 
managed memory in flink and get off heap buffer to create writer buffer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor

2023-05-29 Thread via GitHub


Fanoid commented on code in PR #210:
URL: https://github.com/apache/flink-ml/pull/210#discussion_r1209635209


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/GBTRunner.java:
##
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.gbt;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationConfig;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.ReplayableDataStreamList;
+import org.apache.flink.ml.classification.gbtclassifier.GBTClassifier;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.common.gbt.defs.BoostingStrategy;
+import org.apache.flink.ml.common.gbt.defs.FeatureMeta;
+import org.apache.flink.ml.common.gbt.defs.LossType;
+import org.apache.flink.ml.common.gbt.defs.Node;
+import org.apache.flink.ml.common.gbt.defs.TaskType;
+import org.apache.flink.ml.common.gbt.defs.TrainContext;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.regression.gbtregressor.GBTRegressor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Runs a gradient boosting trees implementation. */
+public class GBTRunner {
+
+private static boolean isVectorType(TypeInformation typeInfo) {
+return typeInfo instanceof DenseVectorTypeInfo
+|| typeInfo instanceof SparseVectorTypeInfo
+|| typeInfo instanceof VectorTypeInfo;
+}
+
+public static DataStream train(Table data, BaseGBTParams 
estimator) {
+String[] featuresCols = estimator.getFeaturesCols();
+TypeInformation[] featuresTypes =
+Arrays.stream(featuresCols)
+.map(d -> 
TableUtils.getTypeInfoByName(data.getResolvedSchema(), d))
+.toArray(TypeInformation[]::new);
+for (int i = 0; i < featuresCols.length; i += 1) {
+Preconditions.checkArgument(
+null != featuresTypes[i],
+String.format(
+"Column name %s not existed in the input data.", 
featuresCols[i]));
+}
+
+boolean isInputVector = featuresCols.length == 1 && 
isVectorType(featuresTypes[0]);
+return train(data, getStrategy(estimator, isInputVector));
+}
+
+/** Trains a gradient boosting tree model with given data and parameters. 
*/
+static DataStream train(Table dataTable, BoostingStrategy 
strategy) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
dataTable).getTableEnvironment();
+Tuple2> preprocessResult =
+strategy.isInputVector
+? Preprocess.preprocessVecCol(dataTable, strategy)
+: Preprocess.preprocessCols(dataTable, strategy);
+dataTable = preprocessResult.f0;
+DataStream featureMeta = preprocessResult.f1;
+
+DataStream data = tEnv.toDataStream(dataTable);
+DataStream> labelSumCount =
+

[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor

2023-05-29 Thread via GitHub


Fanoid commented on code in PR #210:
URL: https://github.com/apache/flink-ml/pull/210#discussion_r1209634597


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/GBTRunner.java:
##
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.gbt;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationConfig;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.ReplayableDataStreamList;
+import org.apache.flink.ml.classification.gbtclassifier.GBTClassifier;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.common.gbt.defs.BoostingStrategy;
+import org.apache.flink.ml.common.gbt.defs.FeatureMeta;
+import org.apache.flink.ml.common.gbt.defs.LossType;
+import org.apache.flink.ml.common.gbt.defs.Node;
+import org.apache.flink.ml.common.gbt.defs.TaskType;
+import org.apache.flink.ml.common.gbt.defs.TrainContext;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.regression.gbtregressor.GBTRegressor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Runs a gradient boosting trees implementation. */
+public class GBTRunner {
+
+private static boolean isVectorType(TypeInformation typeInfo) {
+return typeInfo instanceof DenseVectorTypeInfo
+|| typeInfo instanceof SparseVectorTypeInfo
+|| typeInfo instanceof VectorTypeInfo;
+}
+
+public static DataStream train(Table data, BaseGBTParams 
estimator) {
+String[] featuresCols = estimator.getFeaturesCols();
+TypeInformation[] featuresTypes =
+Arrays.stream(featuresCols)
+.map(d -> 
TableUtils.getTypeInfoByName(data.getResolvedSchema(), d))
+.toArray(TypeInformation[]::new);
+for (int i = 0; i < featuresCols.length; i += 1) {
+Preconditions.checkArgument(
+null != featuresTypes[i],
+String.format(
+"Column name %s not existed in the input data.", 
featuresCols[i]));
+}
+
+boolean isInputVector = featuresCols.length == 1 && 
isVectorType(featuresTypes[0]);
+return train(data, getStrategy(estimator, isInputVector));
+}
+
+/** Trains a gradient boosting tree model with given data and parameters. 
*/
+static DataStream train(Table dataTable, BoostingStrategy 
strategy) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
dataTable).getTableEnvironment();
+Tuple2> preprocessResult =
+strategy.isInputVector
+? Preprocess.preprocessVecCol(dataTable, strategy)
+: Preprocess.preprocessCols(dataTable, strategy);
+dataTable = preprocessResult.f0;
+DataStream featureMeta = preprocessResult.f1;
+
+DataStream data = tEnv.toDataStream(dataTable);
+DataStream> labelSumCount =
+

[jira] (FLINK-32194) Elasticsearch connector should remove the dependency on flink-shaded

2023-05-29 Thread Yuxin Tan (Jira)


[ https://issues.apache.org/jira/browse/FLINK-32194 ]


Yuxin Tan deleted comment on FLINK-32194:
---

was (Author: tanyuxin):
[~Sergey Nuyanzin]  Could you help review this change? It is also like 
FLINK-32187.

> Elasticsearch connector should remove the dependency on flink-shaded
> 
>
> Key: FLINK-32194
> URL: https://issues.apache.org/jira/browse/FLINK-32194
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / ElasticSearch
>Affects Versions: elasticsearch-4.0.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: elasticsearch-4.0.0
>
>
> The Elasticsearch connector depends on flink-shaded. With the externalization 
> of the connector, the connectors shouldn't rely on Flink-Shaded



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209272423


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/ps/MirrorWorkerOperator.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.ps;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.common.ps.message.ValuesPulledM;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Merges the message from different servers for one pull request.
+ *
+ * Note that for each single-thread worker, there are at exactly 
#numServers pieces for each pull
+ * request in the feedback edge.
+ */
+public class MirrorWorkerOperator extends AbstractStreamOperator

Review Comment:
   I name it as `mirror` here for the following two reasons:
   - This operator merges/concates the answer from SeverOperaotr and feeds it 
to WorkerOperator. In the traditional parameter server architecture, the 
concatenation happens on workers.
   - It is colocated with Worker operator.
   
   I am also open to other names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209272423


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/ps/MirrorWorkerOperator.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.ps;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.common.ps.message.ValuesPulledM;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Merges the message from different servers for one pull request.
+ *
+ * Note that for each single-thread worker, there are at exactly 
#numServers pieces for each pull
+ * request in the feedback edge.
+ */
+public class MirrorWorkerOperator extends AbstractStreamOperator

Review Comment:
   I name it as `mirror` here for the following two reasons:
   - This operator merges/concates the answer from servers and feeds it to 
workers. In the traditional parameter server architecture, the concatenation 
happens on workers.
   - It is colocated with Worker operator.
   
   I am also open to other names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on pull request #22583: [FLINK-31673][jdbc-driver] Add e2e test for flink jdbc driver

2023-05-29 Thread via GitHub


FangYongs commented on PR #22583:
URL: https://github.com/apache/flink/pull/22583#issuecomment-1567634131

   Thanks @libenchao Sounds good to me, I'll move it there


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32212) Job restarting indefinitely after an IllegalStateException from BlobLibraryCacheManager

2023-05-29 Thread Matheus Felisberto (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matheus Felisberto updated FLINK-32212:
---
Description: 
After running for a few hours the job starts to throw IllegalStateException and 
I can't figure out why. To restore the job, I need to manually delete the 
FlinkDeployment to be recreated and redeploy everything.
The jar is built-in into the docker image, hence is defined accordingly with 
the Operator's documentation:
{code:java}
// jarURI: local:///opt/flink/usrlib/my-job.jar {code}
I've tried to move it into /opt/flink/lib/my-job.jar but it didn't work either. 

 
{code:java}
// Source: my-topic (1/2)#30587 
(b82d2c7f9696449a2d9f4dc298c0a008_bc764cd8ddf7a0cff126f51c16239658_0_30587) 
switched from DEPLOYING to FAILED with failure cause: 
java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-7237ecbb12b0b021934b0c81aef78396]
new:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-943737c6790a3ec6870cecd652b956c2]
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
    at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.base/java.lang.Thread.run(Unknown Source) {code}
If there is any other information that can help to identify the problem, please 
let me know.

 

  was:
After running for a few hours the job starts to throw IllegalStateException and 
I can't figure out why. To restore the job, I need to manually delete the 
FlinkDeployment to be recreated and re-deploy everything.
The jar is built-in into the docker image, hence is defined accordingly with 
the Operator's documentation:
{code:java}
// jarURI: local:///opt/flink/usrlib/my-job.jar {code}
I've tried to move it into /opt/flink/lib/my-job.jar but it didn't work either. 

 
{code:java}
// Source: my-topic (1/2)#30587 
(b82d2c7f9696449a2d9f4dc298c0a008_bc764cd8ddf7a0cff126f51c16239658_0_30587) 
switched from DEPLOYING to FAILED with failure cause: 
java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-7237ecbb12b0b021934b0c81aef78396]
new:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-943737c6790a3ec6870cecd652b956c2]
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
    at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.base/java.lang.Thread.run(Unknown Source) {code}
If there is any other information that can help to identify the problem, please 
let me know.

 


> Job restarting indefinitely after an IllegalStateException from 
> BlobLibraryCacheManager
> ---
>
> Key: FLINK-32212
> URL: https://issues.apache.org/jira/browse/FLINK-32212
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.16.1
> Environment: Apache Flink Kubernetes Operator 1.4
>Reporter: Matheus Felisberto
>Priority: Major
>

[jira] [Updated] (FLINK-32212) Job restarting indefinitely after an IllegalStateException from BlobLibraryCacheManager

2023-05-29 Thread Matheus Felisberto (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matheus Felisberto updated FLINK-32212:
---
Description: 
After running for a few hours the job starts to throw IllegalStateException and 
I can't figure out why. To restore the job, I need to manually delete the 
FlinkDeployment to be recreated and re-deploy everything.
The jar is built-in into the docker image, hence is defined accordingly with 
the Operator's documentation:
{code:java}
// jarURI: local:///opt/flink/usrlib/my-job.jar {code}
I've tried to move it into /opt/flink/lib/my-job.jar but it didn't work either. 

 
{code:java}
// Source: my-topic (1/2)#30587 
(b82d2c7f9696449a2d9f4dc298c0a008_bc764cd8ddf7a0cff126f51c16239658_0_30587) 
switched from DEPLOYING to FAILED with failure cause: 
java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-7237ecbb12b0b021934b0c81aef78396]
new:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-943737c6790a3ec6870cecd652b956c2]
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
    at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.base/java.lang.Thread.run(Unknown Source) {code}
If there is any other information that can help to identify the problem, please 
let me know.

 

  was:
After running for a few hours the job starts to throw IllegalStateException and 
I can't figure out why. The jar is built-in into the docker image, hence is 
defined accordingly with the Operator's documentation:

 
{code:java}
// jarURI: local:///opt/flink/usrlib/my-job.jar {code}
 

I've tried to move it into /opt/flink/lib/my-job.jar but it didn't work either. 

 
{code:java}
// Source: my-topic (1/2)#30587 
(b82d2c7f9696449a2d9f4dc298c0a008_bc764cd8ddf7a0cff126f51c16239658_0_30587) 
switched from DEPLOYING to FAILED with failure cause: 
java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-7237ecbb12b0b021934b0c81aef78396]
new:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-943737c6790a3ec6870cecd652b956c2]
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
    at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.base/java.lang.Thread.run(Unknown Source) {code}
If there is any other information that can help to identify the problem, please 
let me know.

 


> Job restarting indefinitely after an IllegalStateException from 
> BlobLibraryCacheManager
> ---
>
> Key: FLINK-32212
> URL: https://issues.apache.org/jira/browse/FLINK-32212
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.16.1
> Environment: Apache Flink Kubernetes Operator 1.4
>Reporter: Matheus Felisberto
>Priority: Major
>
> After running for a few hours the job starts to throw IllegalStateException 
> and I can't figure 

[jira] [Updated] (FLINK-32212) Job restarting indefinitely after an IllegalStateException from BlobLibraryCacheManager

2023-05-29 Thread Matheus Felisberto (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matheus Felisberto updated FLINK-32212:
---
Environment: Apache Flink Kubernetes Operator 1.4  (was: I'm running my 
workload on Kubernetes using Operator v1.4 and Flink 1.16)

> Job restarting indefinitely after an IllegalStateException from 
> BlobLibraryCacheManager
> ---
>
> Key: FLINK-32212
> URL: https://issues.apache.org/jira/browse/FLINK-32212
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.16.1
> Environment: Apache Flink Kubernetes Operator 1.4
>Reporter: Matheus Felisberto
>Priority: Major
>
> After running for a few hours the job starts to throw IllegalStateException 
> and I can't figure out why. The jar is built-in into the docker image, hence 
> is defined accordingly with the Operator's documentation:
>  
> {code:java}
> // jarURI: local:///opt/flink/usrlib/my-job.jar {code}
>  
> I've tried to move it into /opt/flink/lib/my-job.jar but it didn't work 
> either. 
>  
> {code:java}
> // Source: my-topic (1/2)#30587 
> (b82d2c7f9696449a2d9f4dc298c0a008_bc764cd8ddf7a0cff126f51c16239658_0_30587) 
> switched from DEPLOYING to FAILED with failure cause: 
> java.lang.IllegalStateException: The library registration references a 
> different set of library BLOBs than previous registrations for this job:
> old:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-7237ecbb12b0b021934b0c81aef78396]
> new:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-943737c6790a3ec6870cecd652b956c2]
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
>     at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>     at java.base/java.lang.Thread.run(Unknown Source) {code}
> If there is any other information that can help to identify the problem, 
> please let me know.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32212) Job restarting indefinitely after an IllegalStateException from BlobLibraryCacheManager

2023-05-29 Thread Matheus Felisberto (Jira)
Matheus Felisberto created FLINK-32212:
--

 Summary: Job restarting indefinitely after an 
IllegalStateException from BlobLibraryCacheManager
 Key: FLINK-32212
 URL: https://issues.apache.org/jira/browse/FLINK-32212
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.16.1
 Environment: I'm running my workload on Kubernetes using Operator v1.4 
and Flink 1.16
Reporter: Matheus Felisberto


After running for a few hours the job starts to throw IllegalStateException and 
I can't figure out why. The jar is built-in into the docker image, hence is 
defined accordingly with the Operator's documentation:

 
{code:java}
// jarURI: local:///opt/flink/usrlib/my-job.jar {code}
 

I've tried to move it into /opt/flink/lib/my-job.jar but it didn't work either. 

 
{code:java}
// Source: my-topic (1/2)#30587 
(b82d2c7f9696449a2d9f4dc298c0a008_bc764cd8ddf7a0cff126f51c16239658_0_30587) 
switched from DEPLOYING to FAILED with failure cause: 
java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-7237ecbb12b0b021934b0c81aef78396]
new:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-943737c6790a3ec6870cecd652b956c2]
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
    at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
    at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.base/java.lang.Thread.run(Unknown Source) {code}
If there is any other information that can help to identify the problem, please 
let me know.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-jdbc] eskabetxe commented on a diff in pull request #8: [FLINK-14102] Introduce DB2Dialect.

2023-05-29 Thread via GitHub


eskabetxe commented on code in PR #8:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/8#discussion_r1209311925


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/Db2RowConverter.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.internal.converter;
+
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink 
internal object for
+ * Db2.
+ */
+public class Db2RowConverter extends AbstractJdbcRowConverter {

Review Comment:
   this should be moved to databases/db2/dialect



##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/table/Db2TableSourceITCase.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.db2.table;
+
+import org.apache.flink.connector.jdbc.databases.db2.Db2TestBase;
+import org.apache.flink.connector.jdbc.databases.db2.dialect.Db2Dialect;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The Table Source ITCase for {@link Db2Dialect}. */
+public class Db2TableSourceITCase extends JdbcDynamicTableSourceITCase 
implements Db2TestBase {

Review Comment:
   can you change the name to Db2DynamicTableSourceITCase
   
   and see another implementation as this could be simplified a lot..



##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/db2/table/Db2TableSinkITCase.java:
##
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.db2.table;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;

[GitHub] [flink-connector-jdbc] eskabetxe commented on a diff in pull request #3: [FLINK-15462][connectors] Add Trino dialect

2023-05-29 Thread via GitHub


eskabetxe commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/3#discussion_r1209310699


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/trino/TrinoPreparedStatementTest.java:
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.trino;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+import 
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TrinoPreparedStatementTest}. */
+public class TrinoPreparedStatementTest {

Review Comment:
   rebased and updated tests



##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/trino/TrinoTableSinkITCase.java:
##
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.trino;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.databases.trino.TrinoDatabase;
+import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
+import org.apache.flink.connector.jdbc.internal.JdbcTableOutputFormatTest;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
+import 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 

[GitHub] [flink-connector-jdbc] eskabetxe commented on a diff in pull request #3: [FLINK-15462][connectors] Add Trino dialect

2023-05-29 Thread via GitHub


eskabetxe commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/3#discussion_r1209310393


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/trino/TrinoDialect.java:
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.trino;
+
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.internal.converter.TrinoRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+
+/** JDBC dialect for Trino. */
+class TrinoDialect extends AbstractDialect {
+
+private static final long serialVersionUID = 1L;
+
+// Define MAX/MIN precision of TIMESTAMP type according to Trino docs:
+// https://trino.io/docs/current/language/types.html#timestamp-p
+private static final int MAX_TIMESTAMP_PRECISION = 12;
+private static final int MIN_TIMESTAMP_PRECISION = 1;
+
+// Define MAX/MIN precision of DECIMAL type according to Trino docs:
+// https://trino.io/docs/current/language/types.html#decimal
+private static final int MAX_DECIMAL_PRECISION = 38;
+private static final int MIN_DECIMAL_PRECISION = 1;
+
+@Override
+public JdbcRowConverter getRowConverter(RowType rowType) {
+return new TrinoRowConverter(rowType);
+}
+
+@Override
+public String getLimitClause(long limit) {
+return "LIMIT " + limit;
+}
+
+@Override
+public Optional defaultDriverName() {
+return Optional.of("io.trino.jdbc.TrinoDriver");
+}
+
+@Override
+public String dialectName() {
+return "Trino";
+}
+
+@Override
+public String quoteIdentifier(String identifier) {
+return identifier;
+}
+
+@Override
+public Optional getUpsertStatement(
+String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+return Optional.empty();
+}
+
+@Override
+public Optional decimalPrecisionRange() {
+return Optional.of(Range.of(MIN_DECIMAL_PRECISION, 
MAX_DECIMAL_PRECISION));
+}
+
+@Override
+public Optional timestampPrecisionRange() {
+return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, 
MAX_TIMESTAMP_PRECISION));
+}
+
+@Override
+public Set supportedTypes() {
+// The data types used in Trino are list at:
+// https://trino.io/docs/current/language/types.html#
+
+return EnumSet.of(
+LogicalTypeRoot.BOOLEAN,
+LogicalTypeRoot.TINYINT,
+LogicalTypeRoot.SMALLINT,
+LogicalTypeRoot.INTEGER,
+LogicalTypeRoot.BIGINT,
+LogicalTypeRoot.DOUBLE,
+LogicalTypeRoot.DECIMAL,
+LogicalTypeRoot.VARCHAR,
+LogicalTypeRoot.CHAR,
+LogicalTypeRoot.VARBINARY,
+LogicalTypeRoot.DATE,
+LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
+LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
+LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+LogicalTypeRoot.ARRAY,
+LogicalTypeRoot.MAP,
+LogicalTypeRoot.ROW);

Review Comment:
   Added



##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/trino/TrinoDialectTypeTest.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or 

[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

2023-05-29 Thread via GitHub


WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1209306292


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/ConsumerNettyService.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+
+import java.util.Optional;
+import java.util.function.BiConsumer;
+
+/** {@link ConsumerNettyService} is used to consume buffer from netty client 
in consumer side. */
+public interface ConsumerNettyService {
+
+/**
+ * Set up the netty service in consumer side.
+ *
+ * @param inputChannels in consumer side.
+ * @param lastPrioritySequenceNumber is the array to record the priority 
sequence number.
+ * @param subpartitionAvailableNotifier is used to notify the subpartition 
is available.
+ */
+void setup(
+InputChannel[] inputChannels,
+int[] lastPrioritySequenceNumber,
+BiConsumer subpartitionAvailableNotifier);
+
+/**
+ * Read a buffer related to the specific subpartition from NettyService.

Review Comment:
   Fixed.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/CreditBasedBufferQueueViewImpl.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferContext;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The implementation of {@link CreditBasedBufferQueueView}. */
+public class CreditBasedBufferQueueViewImpl implements 
CreditBasedBufferQueueView {
+
+private final BufferAvailabilityListener availabilityListener;
+
+private int consumedBufferIndex = -1;
+
+@GuardedBy("viewLock")

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

2023-05-29 Thread via GitHub


WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1209305502


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSegmentEvent.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * {@link EndOfSegmentEvent} is used to notify the downstream switch tiers in 
Tiered Store shuffle

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

2023-05-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1209299273


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSegmentEvent.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * {@link EndOfSegmentEvent} is used to notify the downstream switch tiers in 
Tiered Store shuffle

Review Comment:
   All the `Tiered Store` should be renamed to `tiered storage`(Little case).



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/ConsumerNettyService.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+
+import java.util.Optional;
+import java.util.function.BiConsumer;
+
+/** {@link ConsumerNettyService} is used to consume buffer from netty client 
in consumer side. */
+public interface ConsumerNettyService {
+
+/**
+ * Set up the netty service in consumer side.
+ *
+ * @param inputChannels in consumer side.
+ * @param lastPrioritySequenceNumber is the array to record the priority 
sequence number.
+ * @param subpartitionAvailableNotifier is used to notify the subpartition 
is available.
+ */
+void setup(
+InputChannel[] inputChannels,
+int[] lastPrioritySequenceNumber,
+BiConsumer subpartitionAvailableNotifier);
+
+/**
+ * Read a buffer related to the specific subpartition from NettyService.

Review Comment:
   Where is the NettyService?  We had better use {@link xxx} instead, 
otherwise, when renaming a class, the code similar to this maybe wrongly 
ignored.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/CreditBasedBufferQueueViewImpl.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import 

[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209272423


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/ps/MirrorWorkerOperator.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.ps;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.common.ps.message.ValuesPulledM;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Merges the message from different servers for one pull request.
+ *
+ * Note that for each single-thread worker, there are at exactly 
#numServers pieces for each pull
+ * request in the feedback edge.
+ */
+public class MirrorWorkerOperator extends AbstractStreamOperator

Review Comment:
   I name it as `mirror` here for the following two reasons:
   - This operator merges the answer from servers and feed it to workers. It is 
doing the concatenation for workers.
   - It is colocated with Worker operator.
   
   I am also open to other names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209272423


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/ps/MirrorWorkerOperator.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.ps;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.common.ps.message.ValuesPulledM;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Merges the message from different servers for one pull request.
+ *
+ * Note that for each single-thread worker, there are at exactly 
#numServers pieces for each pull
+ * request in the feedback edge.
+ */
+public class MirrorWorkerOperator extends AbstractStreamOperator

Review Comment:
   I name it as `mirror` here for the following two reasons:
   - This operator merges the answer from servers and feed it to workers. It is 
doing the concatenation for workers.
   - It is colocated with Worker operator.
   
   I am open to other names~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209268250


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/ps/message/ZerosToPushM.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.ps.message;
+
+import org.apache.flink.ml.util.Bits;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Message sent by worker to server that initializes the model as a dense 
array with defined range.
+ */
+public class ZerosToPushM implements Message {
+public final int workerId;
+public final int serverId;
+public final long startIndex;
+public final long endIndex;
+
+public static final MessageType MESSAGE_TYPE = MessageType.ZEROS_TO_PUSH;

Review Comment:
   Thanks for pointing this out. It has been removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209262254


##
flink-ml-servable-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LogisticRegressionModelServable.java:
##
@@ -81,11 +82,49 @@ public DataFrame transform(DataFrame input) {
 public LogisticRegressionModelServable setModelData(InputStream... 
modelDataInputs)
 throws IOException {
 Preconditions.checkArgument(modelDataInputs.length == 1);
+List modelPieces = new ArrayList<>();
+while (true) {
+try {
+LogisticRegressionModelData piece =
+LogisticRegressionModelData.decode(modelDataInputs[0]);

Review Comment:
   Storing all segments in a list would probably leads to OOM here. When 
dealing with large models, we probably need to partition them into segments and 
store it one by one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209259740


##
flink-ml-servable-core/src/main/java/org/apache/flink/ml/common/feature/LabeledLargePointWithWeight.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.feature;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/** A data point to represent values that use long as index and double as 
values. */
+public class LabeledLargePointWithWeight {
+public Tuple2 features;

Review Comment:
   `SparseVector` currently only supports `int` index. However, the index of 
real-world data could exceeds the range of `int` and we use `long` to describe 
the index.
   
   It is a bit tricky to extend `SparseVector` to support `long` as index.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhougit86 commented on pull request #22624: [FLINK-32132][table-planner] Cast function CODEGEN does not work as e…

2023-05-29 Thread via GitHub


zhougit86 commented on PR #22624:
URL: https://github.com/apache/flink/pull/22624#issuecomment-1567066058

   @snuyanzin Hi Master, Could you please help review the test case and let me 
know if it makes sense 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209259740


##
flink-ml-servable-core/src/main/java/org/apache/flink/ml/common/feature/LabeledLargePointWithWeight.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.feature;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/** A data point to represent values that use long as index and double as 
values. */
+public class LabeledLargePointWithWeight {
+public Tuple2 features;

Review Comment:
   `SparseVector` currently only supports `int` index. However, the range of 
`int` cannot meet the requirements of high dimensional data.
   
   It is a bit tricky to extend `SparseVector` to support `long` as index.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] TanYuxin-tyx commented on pull request #65: [FLINK-32194][connectors/elasticsearch] Remove the dependency on flink-shaded

2023-05-29 Thread via GitHub


TanYuxin-tyx commented on PR #65:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/65#issuecomment-1567063394

   @reswqa  Thanks a lot for the quick review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] WenDing-Y commented on pull request #49: [FLINK-32068] connector jdbc support clickhouse

2023-05-29 Thread via GitHub


WenDing-Y commented on PR #49:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/49#issuecomment-1567061806

   Please help me review, thank you very much @snuyanzin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209256528


##
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LogisticRegressionWithFtrl.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.logisticregression;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.feature.LabeledLargePointWithWeight;
+import org.apache.flink.ml.common.lossfunc.BinaryLogisticLoss;
+import org.apache.flink.ml.common.lossfunc.LossFunc;
+import org.apache.flink.ml.common.ps.training.IterationStageList;
+import org.apache.flink.ml.common.ps.training.ProcessStage;
+import org.apache.flink.ml.common.ps.training.PullStage;
+import org.apache.flink.ml.common.ps.training.PushStage;
+import org.apache.flink.ml.common.ps.training.SerializableConsumer;
+import org.apache.flink.ml.common.ps.training.TrainingContext;
+import org.apache.flink.ml.common.ps.training.TrainingUtils;
+import org.apache.flink.ml.common.updater.FTRL;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.util.ResettableIterator;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SerializableFunction;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import org.apache.commons.collections.IteratorUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An Estimator which implements the large scale logistic regression algorithm 
using FTRL optimizer.
+ *
+ * See https://en.wikipedia.org/wiki/Logistic_regression.
+ */
+public class LogisticRegressionWithFtrl

Review Comment:
   Good point. The current implementation of `LogisticRegressionWithFtrl` and 
`LogisticRegression` employed different optimizers and and should be unified 
(probably in a later PR).
   
   I would like to abstract an optimizer for existing implementation of 
different models (LogisticRegression, LinearSVC, etc). There are two possible 
options:
   - For each optimizer and each model, we construct a new Estimator. For 
example `LogisticRegressionWithSGD` and `LogisticRegressionWithFTRL`, 
`LogisticRegressionWithLBFGS` for logistic regression model.
   - We abstract optimizer as one parameter of LogisticRegression. We have only 
one Estimator for each model and let users set different optimizers.
   
   I think option-2 is more intuitive, but we can talk offlline for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209256528


##
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LogisticRegressionWithFtrl.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.logisticregression;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.feature.LabeledLargePointWithWeight;
+import org.apache.flink.ml.common.lossfunc.BinaryLogisticLoss;
+import org.apache.flink.ml.common.lossfunc.LossFunc;
+import org.apache.flink.ml.common.ps.training.IterationStageList;
+import org.apache.flink.ml.common.ps.training.ProcessStage;
+import org.apache.flink.ml.common.ps.training.PullStage;
+import org.apache.flink.ml.common.ps.training.PushStage;
+import org.apache.flink.ml.common.ps.training.SerializableConsumer;
+import org.apache.flink.ml.common.ps.training.TrainingContext;
+import org.apache.flink.ml.common.ps.training.TrainingUtils;
+import org.apache.flink.ml.common.updater.FTRL;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.util.ResettableIterator;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SerializableFunction;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import org.apache.commons.collections.IteratorUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An Estimator which implements the large scale logistic regression algorithm 
using FTRL optimizer.
+ *
+ * See https://en.wikipedia.org/wiki/Logistic_regression.
+ */
+public class LogisticRegressionWithFtrl

Review Comment:
   Good point. The current implementation of `LogisticRegressionWithFtrl` and 
`LogisticRegression` employed different optimizers and I would like to update 
the implementation of `LogisticRegression` in a later PR.
   
   I would like to abstract an optimizer for existing implementation of 
LogisticRegression/LinearSVC, etc. There are two possible options:
   - For each optimizer for each model, we construct a new Estimator. For 
example `LogisticRegressionWithSGD` and `LogisticRegressionWithFTRL`, 
`LogisticRegressionWithLBFGS`, etc.
   - We abstract optimizer as one parameter of LogisticRegression. We have only 
one Estimator pair for each model and let users set different optimizers.
   
   I think option-2 is more intuitive, but we can talk offlline for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22501: [FLINK-31637][network] Implement the BufferAccumulator for the tiered storage

2023-05-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22501:
URL: https://github.com/apache/flink/pull/22501#discussion_r1209256140


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulator.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+/**
+ * The hash implementation of the {@link BufferAccumulator}. The {@link 
BufferAccumulator} receives
+ * the records from {@link TieredStorageProducerClient} and the records will 
accumulate and
+ * transform to finished buffers. The finished buffers will be transferred to 
the corresponding tier
+ * dynamically.
+ */

Review Comment:
   Added detailed descriptions about the hash-mode accumulator, including the 
buffer requirement and the flush time.  And similar comments should also be 
added when implementing the sort-based accumulator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22501: [FLINK-31637][network] Implement the BufferAccumulator for the tiered storage

2023-05-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22501:
URL: https://github.com/apache/flink/pull/22501#discussion_r1209254305


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SubpartitionHashBufferAccumulator.java:
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SubpartitionHashBufferAccumulator} accumulates the records in a 
subpartition.
+ *
+ * Note that {@link #setup} need an argument of buffer flush listener to 
accept the finished
+ * accumulated buffers.
+ */
+public class SubpartitionHashBufferAccumulator {
+
+private final TieredStorageSubpartitionId subpartitionId;
+
+private final int bufferSize;
+
+private final HashBufferAccumulatorOperation bufferAccumulatorOperation;
+
+private BiConsumer> 
accumulatedBufferFlusher;
+
+// Not guarded by lock because it is expected only accessed from task's 
main thread.

Review Comment:
   Fixed, added the comments in class java doc.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulatorOperation.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+
+/**
+ * This interface is used by {@link SubpartitionHashBufferAccumulator} to 
operate {@link
+ * HashBufferAccumulator}.
+ */
+public interface HashBufferAccumulatorOperation {

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22501: [FLINK-31637][network] Implement the BufferAccumulator for the tiered storage

2023-05-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22501:
URL: https://github.com/apache/flink/pull/22501#discussion_r1209254538


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SubpartitionHashBufferAccumulator.java:
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SubpartitionHashBufferAccumulator} accumulates the records in a 
subpartition.
+ *
+ * Note that {@link #setup} need an argument of buffer flush listener to 
accept the finished
+ * accumulated buffers.
+ */
+public class SubpartitionHashBufferAccumulator {

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22501: [FLINK-31637][network] Implement the BufferAccumulator for the tiered storage

2023-05-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22501:
URL: https://github.com/apache/flink/pull/22501#discussion_r1209253952


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SubpartitionHashBufferAccumulator.java:
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SubpartitionHashBufferAccumulator} accumulates the records in a 
subpartition.
+ *
+ * Note that {@link #setup} need an argument of buffer flush listener to 
accept the finished
+ * accumulated buffers.
+ */
+public class SubpartitionHashBufferAccumulator {
+
+private final TieredStorageSubpartitionId subpartitionId;
+
+private final int bufferSize;
+
+private final HashBufferAccumulatorOperation bufferAccumulatorOperation;
+
+private BiConsumer> 
accumulatedBufferFlusher;

Review Comment:
   After removing the `setup` of the subpartitioin accumulator, it is marked as 
`final`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22501: [FLINK-31637][network] Implement the BufferAccumulator for the tiered storage

2023-05-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #22501:
URL: https://github.com/apache/flink/pull/22501#discussion_r1209253279


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulator.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+/**
+ * The hash implementation of the {@link BufferAccumulator}. The {@link 
BufferAccumulator} receives
+ * the records from {@link TieredStorageProducerClient} and the records will 
accumulate and
+ * transform to finished buffers. The finished buffers will be transferred to 
the corresponding tier
+ * dynamically.
+ */
+public class HashBufferAccumulator implements BufferAccumulator, 
HashBufferAccumulatorOperation {
+
+private final int bufferSize;
+
+private final TieredStorageMemoryManager storageMemoryManager;
+
+private SubpartitionHashBufferAccumulator[] 
subpartitionHashBufferAccumulators;

Review Comment:
   Fixed and checked it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209250004


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/updater/ModelUpdater.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.updater;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * A model updater that could be used to handle push/pull request from workers.
+ *
+ * Note that model updater should also ensure that model data is robust to 
failures.
+ */
+public interface ModelUpdater extends Serializable {
+
+/** Initialize the model data. */
+void open(long startFeatureIndex, long endFeatureIndex);
+
+/** Applies the push to update the model data, e.g., using gradient to 
update model. */
+void handlePush(long[] keys, double[] values);
+
+/** Applies the pull and return the retrieved model data. */
+double[] handlePull(long[] keys);
+
+/** Returns model pieces with the format of (startFeatureIdx, 
endFeatureIdx, modelValues). */
+Iterator> getModelPieces();

Review Comment:
   The model segments are continuously updated/retrieved by push/pull (i.e., 
`handlePush` and `handlePull`).
   
   I have renamed `pieces` as segments in the PR and also added the above 
description in the java doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209245957


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/updater/ModelUpdater.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.updater;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * A model updater that could be used to handle push/pull request from workers.
+ *
+ * Note that model updater should also ensure that model data is robust to 
failures.
+ */
+public interface ModelUpdater extends Serializable {
+
+/** Initialize the model data. */
+void open(long startFeatureIndex, long endFeatureIndex);
+
+/** Applies the push to update the model data, e.g., using gradient to 
update model. */
+void handlePush(long[] keys, double[] values);
+
+/** Applies the pull and return the retrieved model data. */
+double[] handlePull(long[] keys);

Review Comment:
   In this PR, we propose to use two type of roles to describe the iterative 
machine learning training process following the idea of parameter servers. 
   - WorkerOp stores the training data and only involves local computation 
logic. When it needs to access model parameters and involves distributed 
communication, it communicates with ServerOp via `push/pull` primitive. The 
`push/pull` could be sparse key-value pairs or dense values. Currently only 
sparse key-value are supported.
   - ServerOp stores the model parameters and provide access to WorkerOps.
   - Subtasks of WorkerOp cannot talk to each other. Subtasks of ServerOp 
cannot talk to each other.
   
   `handlePush` and `handlePull` are two operations that the server answers the 
request from workers.
   The naming follows the name of `push/pull`. It is possible that `handlePush` 
handle keys that have been updated with `handlePush`, but not necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #237: [Flink-27826] Support training very high dimensional logistic regression

2023-05-29 Thread via GitHub


zhipeng93 commented on code in PR #237:
URL: https://github.com/apache/flink-ml/pull/237#discussion_r1209245957


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/updater/ModelUpdater.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.updater;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * A model updater that could be used to handle push/pull request from workers.
+ *
+ * Note that model updater should also ensure that model data is robust to 
failures.
+ */
+public interface ModelUpdater extends Serializable {
+
+/** Initialize the model data. */
+void open(long startFeatureIndex, long endFeatureIndex);
+
+/** Applies the push to update the model data, e.g., using gradient to 
update model. */
+void handlePush(long[] keys, double[] values);
+
+/** Applies the pull and return the retrieved model data. */
+double[] handlePull(long[] keys);

Review Comment:
   In this PR, we propose to use two type of roles to describe the iterative 
machine learning training process following the idea of parameter servers. 
   - WorkerOp stores the training data and only involves local computation 
logic. When it needs to access model parameters and involves distributed 
communication, it communicates with ServerOp via `push/pull` primitive. The 
`push/pull` could be sparse key-value pairs or dense values. Currently only 
sparse key-value are supported.
   - ServerOp stores the model parameters and provide access to WorkerOps.
   - Subtasks of WorkerOp cannot talk to each other. Subtasks of ServerOp 
cannot talk to each other.
   
   `handlePush` and `handlePull` are two operations that the server answers the 
request from workers.
   The naming following the name of `push/pull`. It is possible that 
`handlePush` handle keys that have been updated with `handlePush`, but not 
necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32194) Elasticsearch connector should remove the dependency on flink-shaded

2023-05-29 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-32194:
---
Fix Version/s: elasticsearch-4.0.0

> Elasticsearch connector should remove the dependency on flink-shaded
> 
>
> Key: FLINK-32194
> URL: https://issues.apache.org/jira/browse/FLINK-32194
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / ElasticSearch
>Affects Versions: elasticsearch-4.0.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: elasticsearch-4.0.0
>
>
> The Elasticsearch connector depends on flink-shaded. With the externalization 
> of the connector, the connectors shouldn't rely on Flink-Shaded



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32194) Elasticsearch connector should remove the dependency on flink-shaded

2023-05-29 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo resolved FLINK-32194.

Resolution: Done

main via 71a8567bbd83a111df4f85d4465e4cda0ccae916.

> Elasticsearch connector should remove the dependency on flink-shaded
> 
>
> Key: FLINK-32194
> URL: https://issues.apache.org/jira/browse/FLINK-32194
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / ElasticSearch
>Affects Versions: elasticsearch-4.0.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> The Elasticsearch connector depends on flink-shaded. With the externalization 
> of the connector, the connectors shouldn't rely on Flink-Shaded



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-elasticsearch] boring-cyborg[bot] commented on pull request #65: [FLINK-32194][connectors/elasticsearch] Remove the dependency on flink-shaded

2023-05-29 Thread via GitHub


boring-cyborg[bot] commented on PR #65:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/65#issuecomment-1567033289

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] reswqa merged pull request #65: [FLINK-32194][connectors/elasticsearch] Remove the dependency on flink-shaded

2023-05-29 Thread via GitHub


reswqa merged PR #65:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/65


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-05-29 Thread jirawech.s (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727071#comment-17727071
 ] 

jirawech.s commented on FLINK-31689:


Anyone can help me review PR pls

https://github.com/apache/flink/pull/22400

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Assignee: jirawech.s
>Priority: Major
>  Labels: pull-request-available
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] curcur commented on pull request #22590: [FLINK-32071] Implement the snapshot manager for merged checkpoint files in TM

2023-05-29 Thread via GitHub


curcur commented on PR #22590:
URL: https://github.com/apache/flink/pull/22590#issuecomment-1566937767

   Some additional feedback as discussed offline:
   
   1. add a high-level layout overview of the directory with file merging 
enabled
   2. add some descriptions about the policy of physical file reuse 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable

2023-05-29 Thread Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727062#comment-17727062
 ] 

Liu commented on FLINK-30629:
-

[~Sergey Nuyanzin] I have tried multi times but can not reproduce it locally. I 
see that the time elapsed nearly 20+ second between the
ClientHeartbeatTest' start and the final error. In normal case, each case 
should be less than 2 second. I wonder whether the info log is printed in 
somewhere? It is hard to see the problem just from the code. We should more 
infos. Or can you give me some suggestions? Thanks.

> ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
> -
>
> Key: FLINK-30629
> URL: https://issues.apache.org/jira/browse/FLINK-30629
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Xintong Song
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0
>
> Attachments: ClientHeartbeatTestLog.txt
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=10819
> {code:java}
> Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 21.02 s <<< FAILURE! - in 
> org.apache.flink.client.ClientHeartbeatTest
> Jan 11 04:32:39 [ERROR] 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat
>   Time elapsed: 9.157 s  <<< ERROR!
> Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet 
> running or has already been shut down.
> Jan 11 04:32:39   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
> Jan 11 04:32:39   at 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31992) FlinkKafkaConsumer API is suggested to use as part of documentation, when that API is deprecated for flink version 1.14

2023-05-29 Thread Jeyassri Balachandran (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727057#comment-17727057
 ] 

Jeyassri Balachandran commented on FLINK-31992:
---

Hi [~sandeshmendan] , I'm new to the Apache Flink community. I'm eager to 
contribute. May I take this up?

> FlinkKafkaConsumer API is suggested to use as part of documentation, when 
> that API is deprecated for flink version 1.14
> ---
>
> Key: FLINK-31992
> URL: https://issues.apache.org/jira/browse/FLINK-31992
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.14.2
>Reporter: Sandesh Mendan
>Priority: Major
>  Labels: documentation, documentation-update, good-first-issue, 
> newbie
> Fix For: 1.14.7
>
>
> In Flink version 1.14, even though the API class FlinkKafkaConsumer had been 
> [deprecated|https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/],
>  the official 
> [documentation|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector]
>  suggests that API to use.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22674: [FLINK-32201][runtime]Automatically determine if the shuffle descriptor needs to be offloaded by the blob server based on the number of Shuf

2023-05-29 Thread via GitHub


flinkbot commented on PR #22674:
URL: https://github.com/apache/flink/pull/22674#issuecomment-1566794737

   
   ## CI report:
   
   * 19fda849b088b4d70ce03e6c650843ed12313651 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32201) Enable the distribution of shuffle descriptors via the blob server by connection number

2023-05-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32201:
---
Labels: pull-request-available  (was: )

> Enable the distribution of shuffle descriptors via the blob server by 
> connection number
> ---
>
> Key: FLINK-32201
> URL: https://issues.apache.org/jira/browse/FLINK-32201
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
>
> Flink support distributes shuffle descriptors via the blob server to reduce 
> JobManager overhead. But the default threshold to enable it is 1MB, which 
> never reaches. Users need to set a proper value for this, but it requires 
> advanced knowledge before configuring it.
> I would like to enable this feature by the number of connections of a group 
> of shuffle descriptors. For examples, a simple streaming job with two 
> operators, each with 10,000 parallelism and connected via all-to-all 
> distribution. In this job, we only get one set of shuffle descriptors, and 
> this group has 1 * 1 connections. This means that JobManager needs to 
> send this set of shuffle descriptors to 1 tasks.
> Since it is also difficult for users to configure, I would like to give it a 
> default value. The serialized shuffle descriptors sizes for different 
> parallelism are shown below.
> || Producer parallelism || serialized shuffle descriptor size || consumer 
> parallelism || total data size that JM needs to send ||
> | 5000 | 100KB | 5000 | 500MB |
> | 1 | 200KB | 1 | 2GB |
> | 2 | 400Kb | 2 | 8GB |
> So, I would like to set the default value to 10,000 * 10,000. 
> Any suggestions or concerns are appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] huwh opened a new pull request, #22674: [FLINK-32201][runtime]Automatically determine if the shuffle descriptor needs to be offloaded by the blob server based on the number of Shuffle

2023-05-29 Thread via GitHub


huwh opened a new pull request, #22674:
URL: https://github.com/apache/flink/pull/22674

   ## What is the purpose of the change
   
   Flink support distributes shuffle descriptors via the blob server to reduce 
JobManager overhead. But the default threshold to enable it is 1MB, which never 
reaches. Users need to set a proper value for this, but it requires advanced 
knowledge before configuring it.
   
   I would like to enable this feature by the number of connections of a group 
of shuffle descriptors. For examples, a simple streaming job with two 
operators, each with 10,000 parallelism and connected via all-to-all 
distribution. In this job, we only get one set of shuffle descriptors, and this 
group has 1 * 1 connections. This means that JobManager needs to send 
this set of shuffle descriptors to 1 tasks.
   
   Since it's difficult for users to configure, I would like to give it a 
default value. 
   
   ## Brief change log
 - *determine if the shuffle descriptor needs to be offloaded by the blob 
server based on the number of ShuffleDescriptor edges.*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - *Manually verified the change by running a cluster with 1 JobManagers 
and 2000 TaskManagers (10 slots per TaskManager), a streaming program with 
2 parallelism, and verifying that the task deploy was successful.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  (no)
 - The serializers:  (no)
 - The runtime per-record code paths (performance sensitive):  (no)
 - Anything that affects deployment or recovery: Task Deployment: (yes)
 - The S3 file system connector:  (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #22670: [FLINK-28386] Trigger an immediate checkpoint after all sinks finished

2023-05-29 Thread via GitHub


pnowojski commented on code in PR #22670:
URL: https://github.com/apache/flink/pull/22670#discussion_r1209080632


##
flink-runtime/src/main/java/org/apache/flink/runtime/sink/coordinator/SinkCoordinator.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.sink.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the {@link
+ * org.apache.flink.api.connector.sink2.Sink}.
+ *
+ * The SinkCoordinator helps trigger an immediate global 
checkpoint when all sink
+ * operators are reaching end of data.
+ */
+@Internal
+public class SinkCoordinator implements OperatorCoordinator {

Review Comment:
   I have a couple of remarks about this construct:
   - What if some parts of a job don't have any sink? 
   - What will happen if Sink has it's own `Coordinator`? Can we have two 
coordinators at the same time?
   - I think this is pretty invasive design. We need to send extra events from 
the operators to JM, we need to expose triggering checkpoints to the 
coordinators, we have to wire everything. This touches, modifies and connects a 
lot of components. Why can not all of this happen purely in the JM, maybe just 
inside `CheckpointCoordinator`? We could add another 
`ExecutionState.WAITING_FOR_FINAL_CHECKPOINT` or maybe simply `FINISHING`, 
similarly how we added `ExecutionState.INITIALIZING` not that long time ago and 
just listen in the JM (or `CheckpointCoordinator`) until all subtasks have 
reached this new state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling

2023-05-29 Thread RocMarshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727041#comment-17727041
 ] 

RocMarshal commented on FLINK-31757:


I have compiled a draft  
[https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8/edit?usp=sharing]
Looking forward   your discussion.  [~fanrui] [~huwh] [~Weijie Guo] [~chesnay] 

> Optimize Flink un-balanced tasks scheduling
> ---
>
> Key: FLINK-31757
> URL: https://issues.apache.org/jira/browse/FLINK-31757
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: RocMarshal
>Assignee: RocMarshal
>Priority: Major
> Attachments: image-2023-04-13-08-04-04-667.png
>
>
> Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is 
> 100, and the others are 5. If each {{TaskManager}} only have one slot, then 
> we need 100 TMs.
> There will be 5 slots with 21 sub-tasks, and the others will only have one 
> sub-task of A. Does this mean we have to make a trade-off between wasted 
> resources and insufficient resources?
> From a resource utilization point of view, we expect all subtasks to be 
> evenly distributed on each TM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31757) Optimize Flink un-balanced tasks scheduling

2023-05-29 Thread RocMarshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727041#comment-17727041
 ] 

RocMarshal edited comment on FLINK-31757 at 5/29/23 8:25 AM:
-

I have compiled a draft  
[https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8/edit?usp=sharing]
Looking forward   your discussion.  [~fanrui] [~huwh] [~Weijie Guo] [~chesnay] 
Thank you.


was (Author: rocmarshal):
I have compiled a draft  
[https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8/edit?usp=sharing]
Looking forward   your discussion.  [~fanrui] [~huwh] [~Weijie Guo] [~chesnay] 

> Optimize Flink un-balanced tasks scheduling
> ---
>
> Key: FLINK-31757
> URL: https://issues.apache.org/jira/browse/FLINK-31757
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: RocMarshal
>Assignee: RocMarshal
>Priority: Major
> Attachments: image-2023-04-13-08-04-04-667.png
>
>
> Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is 
> 100, and the others are 5. If each {{TaskManager}} only have one slot, then 
> we need 100 TMs.
> There will be 5 slots with 21 sub-tasks, and the others will only have one 
> sub-task of A. Does this mean we have to make a trade-off between wasted 
> resources and insufficient resources?
> From a resource utilization point of view, we expect all subtasks to be 
> evenly distributed on each TM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22673: [BP-1.16][FLINK-32023][API / DataStream] Add config execution.buffer-…

2023-05-29 Thread via GitHub


flinkbot commented on PR #22673:
URL: https://github.com/apache/flink/pull/22673#issuecomment-1566714352

   
   ## CI report:
   
   * 916094433c3fb27a7dfa18e36ea52b20d17d3983 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myracle opened a new pull request, #22673: [BP-1.16][FLINK-32023][API / DataStream] Add config execution.buffer-…

2023-05-29 Thread via GitHub


Myracle opened a new pull request, #22673:
URL: https://github.com/apache/flink/pull/22673

   Backports [FLINK-32023](https://issues.apache.org/jira/browse/FLINK-32023) 
to release-1.16.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22672: [BP-1.17][FLINK-32023][API / DataStream] Add config execution.buffer-…

2023-05-29 Thread via GitHub


flinkbot commented on PR #22672:
URL: https://github.com/apache/flink/pull/22672#issuecomment-1566709340

   
   ## CI report:
   
   * babcbd0bac7c0f236d910cf2df724b3e21aa7e4c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myracle opened a new pull request, #22672: [BP-1.17][FLINK-32023][API / DataStream] Add config execution.buffer-…

2023-05-29 Thread via GitHub


Myracle opened a new pull request, #22672:
URL: https://github.com/apache/flink/pull/22672

   Backports [FLINK-32023](https://issues.apache.org/jira/browse/FLINK-32023) 
to release-1.17.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22671: [FLINK-32211][sql-client] Supports row format in executor

2023-05-29 Thread via GitHub


flinkbot commented on PR #22671:
URL: https://github.com/apache/flink/pull/22671#issuecomment-1566659328

   
   ## CI report:
   
   * 35f84f5d9918c99cfc7f9941530bced0f7ae8402 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32211) Supports row format in ExecutorImpl for jdbc driver

2023-05-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32211:
---
Labels: pull-request-available  (was: )

> Supports row format in ExecutorImpl for jdbc driver
> ---
>
> Key: FLINK-32211
> URL: https://issues.apache.org/jira/browse/FLINK-32211
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>  Labels: pull-request-available
>
> Current ExecutorImpl only use RowFormat.PLAIN_TEXT for results, it should 
> support JSON for complex data type such as map/array for jdbc driver



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] FangYongs opened a new pull request, #22671: [FLINK-32211][sql-client] Supports row format in executor

2023-05-29 Thread via GitHub


FangYongs opened a new pull request, #22671:
URL: https://github.com/apache/flink/pull/22671

   ## What is the purpose of the change
   
   This PR aims to add row format in `ExecutorImpl` and flink jdbc driver can 
use json format to get results.
   
   
   ## Brief change log
 - Add row format in `ExecutorImpl`
 - Use json format in jdbc driver
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`FlinkStatementTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
 - The serializers: (yes / no / don't know) no
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32211) Supports row format in ExecutorImpl for jdbc driver

2023-05-29 Thread Fang Yong (Jira)
Fang Yong created FLINK-32211:
-

 Summary: Supports row format in ExecutorImpl for jdbc driver
 Key: FLINK-32211
 URL: https://issues.apache.org/jira/browse/FLINK-32211
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Client
Affects Versions: 1.18.0
Reporter: Fang Yong


Current ExecutorImpl only use RowFormat.PLAIN_TEXT for results, it should 
support JSON for complex data type such as map/array for jdbc driver



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

2023-05-29 Thread via GitHub


reswqa commented on PR #21527:
URL: https://github.com/apache/flink/pull/21527#issuecomment-1566618712

   It seems that some conflicts need to be resolved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32023) execution.buffer-timeout cannot be set to -1 ms

2023-05-29 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32023:

Fix Version/s: 1.18.0

> execution.buffer-timeout cannot be set to -1 ms
> ---
>
> Key: FLINK-32023
> URL: https://issues.apache.org/jira/browse/FLINK-32023
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: Liu
>Assignee: Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> The desc for execution.buffer-timeout is as following:
> {code:java}
> public static final ConfigOption BUFFER_TIMEOUT =
> ConfigOptions.key("execution.buffer-timeout")
> .durationType()
> .defaultValue(Duration.ofMillis(100))
> .withDescription(
> Description.builder()
> .text(
> "The maximum time frequency 
> (milliseconds) for the flushing of the output buffers. By default "
> + "the output buffers flush 
> frequently to provide low latency and to aid smooth developer "
> + "experience. Setting the 
> parameter can result in three logical modes:")
> .list(
> text(
> "A positive value triggers 
> flushing periodically by that interval"),
> text(
> FLUSH_AFTER_EVERY_RECORD
> + " triggers flushing 
> after every record thus minimizing latency"),
> text(
> 
> DISABLED_NETWORK_BUFFER_TIMEOUT
> + " ms triggers 
> flushing only when the output buffer is full thus maximizing "
> + "throughput"))
> .build()); {code}
> When we set execution.buffer-timeout to -1 ms, the following error is 
> reported:
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Could not parse value '-1 ms' 
> for key 'execution.buffer-timeout'.
>     at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:856)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:822)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:224)
>     at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.(StreamContextEnvironment.java:51)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createStreamExecutionEnvironment(StreamExecutionEnvironment.java:1996)
>     at java.util.Optional.orElseGet(Optional.java:267)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:1986)
>     at com.kuaishou.flink.examples.api.WordCount.main(WordCount.java:27)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:327)
>     ... 11 more
> Caused by: java.lang.NumberFormatException: text does not start with a number
>     at org.apache.flink.util.TimeUtils.parseDuration(TimeUtils.java:78)
>     at 
> org.apache.flink.configuration.Configuration.convertToDuration(Configuration.java:1058)
>     at 
> org.apache.flink.configuration.Configuration.convertValue(Configuration.java:996)
>     at 
> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:853)
>     at java.util.Optional.map(Optional.java:215)
>     at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:853)
>     ... 23 more {code}
> The reason is that the value for Duration can not be negative. We should 
> change the behavior or support to trigger flushing only when the output 
> buffer is full.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32023) execution.buffer-timeout cannot be set to -1 ms

2023-05-29 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727021#comment-17727021
 ] 

Rui Fan commented on FLINK-32023:
-

 522ff833ad7e3b3c80f4c1ba326cb05fdc4d6a3c

> execution.buffer-timeout cannot be set to -1 ms
> ---
>
> Key: FLINK-32023
> URL: https://issues.apache.org/jira/browse/FLINK-32023
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: Liu
>Assignee: Liu
>Priority: Major
>  Labels: pull-request-available
>
> The desc for execution.buffer-timeout is as following:
> {code:java}
> public static final ConfigOption BUFFER_TIMEOUT =
> ConfigOptions.key("execution.buffer-timeout")
> .durationType()
> .defaultValue(Duration.ofMillis(100))
> .withDescription(
> Description.builder()
> .text(
> "The maximum time frequency 
> (milliseconds) for the flushing of the output buffers. By default "
> + "the output buffers flush 
> frequently to provide low latency and to aid smooth developer "
> + "experience. Setting the 
> parameter can result in three logical modes:")
> .list(
> text(
> "A positive value triggers 
> flushing periodically by that interval"),
> text(
> FLUSH_AFTER_EVERY_RECORD
> + " triggers flushing 
> after every record thus minimizing latency"),
> text(
> 
> DISABLED_NETWORK_BUFFER_TIMEOUT
> + " ms triggers 
> flushing only when the output buffer is full thus maximizing "
> + "throughput"))
> .build()); {code}
> When we set execution.buffer-timeout to -1 ms, the following error is 
> reported:
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Could not parse value '-1 ms' 
> for key 'execution.buffer-timeout'.
>     at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:856)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:822)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:224)
>     at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.(StreamContextEnvironment.java:51)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createStreamExecutionEnvironment(StreamExecutionEnvironment.java:1996)
>     at java.util.Optional.orElseGet(Optional.java:267)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:1986)
>     at com.kuaishou.flink.examples.api.WordCount.main(WordCount.java:27)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:327)
>     ... 11 more
> Caused by: java.lang.NumberFormatException: text does not start with a number
>     at org.apache.flink.util.TimeUtils.parseDuration(TimeUtils.java:78)
>     at 
> org.apache.flink.configuration.Configuration.convertToDuration(Configuration.java:1058)
>     at 
> org.apache.flink.configuration.Configuration.convertValue(Configuration.java:996)
>     at 
> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:853)
>     at java.util.Optional.map(Optional.java:215)
>     at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:853)
>     ... 23 more {code}
> The reason is that the value for Duration can not be negative. We should 
> change the behavior or support to trigger flushing only when the output 
> buffer is full.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui commented on pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-29 Thread via GitHub


1996fanrui commented on PR #22560:
URL: https://github.com/apache/flink/pull/22560#issuecomment-1566601162

   Thanks everyone who discussed here, and @Myracle 's contribution, merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui merged pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-29 Thread via GitHub


1996fanrui merged PR #22560:
URL: https://github.com/apache/flink/pull/22560


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org