[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r381174084 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.analysis.config.NoneStream; +import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord; +import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.influxdb.dto.Point; + +public class NoneStreamDAO implements INoneStreamDAO { +public static final String TAG_SERVICE_ID = "_service_id"; +private static final int PADDING_SIZE = 1_000_000; +private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE); + +private InfluxClient client; +private StorageBuilder storageBuilder; + +public NoneStreamDAO(InfluxClient client, StorageBuilder storageBuilder) { +this.client = client; +this.storageBuilder = storageBuilder; +} + +@Override +public void insert(final Model model, final NoneStream noneStream) throws IOException { +final long timestamp = TimeBucket.getTimestamp( +noneStream.getTimeBucket(), model.getDownsampling()) * PADDING_SIZE + SUFFIX.getAndIncrement(); + +Point point = new InfluxInsertRequest(model, noneStream, storageBuilder) +.time(timestamp, TimeUnit.NANOSECONDS) Review comment: All data in influxdb is nanosecond. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r380509427 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java ## @@ -95,7 +96,7 @@ public ProfileTask getById(final String id) throws IOException { WhereQueryImpl query = select("ID", ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME, ProfileTaskRecord.CREATE_TIME, - ProfileTaskRecord.DURATION, + "\"" + ProfileTaskRecord.DURATION + "\"", // scape, the 'duration' is identifier Review comment: Check `ModelInstaller#overrideColumnName`, you could replace the name in the storage implementation. That is a more elegant way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r380443678 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/installer/MetaTableDefine.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.installer; + +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo; + +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ENDPOINT_INVENTORY; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NETWORK_ADDRESS; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INVENTORY; + +/** + * Here defines which table is stored in metadata database(H2/MySQL). + */ +public class MetaTableDefine { + +/** + * Test a {@link Model} is stored in H2/MySQL or not. + * + * @param model Model + * @return true if the {@link Model} is stored in H2/MySQL + */ +public static boolean contains(Model model) { +switch (model.getScopeId()) { +case SERVICE_INVENTORY: +case SERVICE_INSTANCE_INVENTORY: +case NETWORK_ADDRESS: +case ENDPOINT_INVENTORY: +case PROFILE_TASK: Review comment: And `PROFILE_TASK` is not a metadata entity This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r380443588 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/installer/MetaTableDefine.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.installer; + +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo; + +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ENDPOINT_INVENTORY; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NETWORK_ADDRESS; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INVENTORY; + +/** + * Here defines which table is stored in metadata database(H2/MySQL). + */ +public class MetaTableDefine { + +/** + * Test a {@link Model} is stored in H2/MySQL or not. + * + * @param model Model + * @return true if the {@link Model} is stored in H2/MySQL + */ +public static boolean contains(Model model) { +switch (model.getScopeId()) { +case SERVICE_INVENTORY: +case SERVICE_INSTANCE_INVENTORY: +case NETWORK_ADDRESS: +case ENDPOINT_INVENTORY: +case PROFILE_TASK: Review comment: Use `model#capableOfTimeSeries` == true check. If YES, then it is a metadata table. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r380077043 ## File path: test/e2e/e2e-ttl/e2e-ttl-influxdb/src/test/java/org/apache/skywalking/e2e/StorageTTLITCase.java ## @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.e2e; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.internal.DnsNameResolverProvider; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.common.DetectPoint; +import org.apache.skywalking.apm.network.servicemesh.MeshProbeDownstream; +import org.apache.skywalking.apm.network.servicemesh.Protocol; +import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric; +import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetricServiceGrpc; +import org.apache.skywalking.e2e.metrics.AllOfMetricsMatcher; +import org.apache.skywalking.e2e.metrics.AtLeastOneOfMetricsMatcher; +import org.apache.skywalking.e2e.metrics.Metrics; +import org.apache.skywalking.e2e.metrics.MetricsQuery; +import org.apache.skywalking.e2e.metrics.MetricsValueMatcher; +import org.apache.skywalking.e2e.service.Service; +import org.apache.skywalking.e2e.service.ServicesQuery; +import org.junit.Before; +import org.junit.Test; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static org.apache.skywalking.e2e.metrics.MetricsQuery.SERVICE_RESP_TIME; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author kezhenxu94 + */ Review comment: I think we should do that. Creating an issue to track this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r380054244 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/installer/MetaTableDefine.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.installer; + +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo; + +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ENDPOINT_INVENTORY; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NETWORK_ADDRESS; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INVENTORY; + +/** + * Here defines which table is stored in metadata database(H2/MySQL). + */ +public class MetaTableDefine { + +/** + * Test a {@link Model} is stored in H2/MySQL or not. + * + * @param model Model + * @return true if the {@link Model} is stored in H2/MySQL + */ +public static boolean contains(Model model) { +switch (model.getScopeId()) { +case SERVICE_INVENTORY: +case SERVICE_INSTANCE_INVENTORY: +case NETWORK_ADDRESS: +case ENDPOINT_INVENTORY: +case PROFILE_TASK: Review comment: What is the principle? Could you add some comments and guidelines about this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r380046157 ## File path: test/e2e/e2e-ttl/e2e-ttl-influxdb/src/test/java/org/apache/skywalking/e2e/StorageTTLITCase.java ## @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.e2e; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.internal.DnsNameResolverProvider; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.common.DetectPoint; +import org.apache.skywalking.apm.network.servicemesh.MeshProbeDownstream; +import org.apache.skywalking.apm.network.servicemesh.Protocol; +import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric; +import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetricServiceGrpc; +import org.apache.skywalking.e2e.metrics.AllOfMetricsMatcher; +import org.apache.skywalking.e2e.metrics.AtLeastOneOfMetricsMatcher; +import org.apache.skywalking.e2e.metrics.Metrics; +import org.apache.skywalking.e2e.metrics.MetricsQuery; +import org.apache.skywalking.e2e.metrics.MetricsValueMatcher; +import org.apache.skywalking.e2e.service.Service; +import org.apache.skywalking.e2e.service.ServicesQuery; +import org.junit.Before; +import org.junit.Test; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static org.apache.skywalking.e2e.metrics.MetricsQuery.SERVICE_RESP_TIME; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author kezhenxu94 + */ Review comment: Why isn't this detected by CI process? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r380044341 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java ## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.query; + +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.core.query.entity.Order; +import org.apache.skywalking.oap.server.core.query.entity.TopNEntity; +import org.apache.skywalking.oap.server.core.register.EndpointInventory; +import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; +import org.apache.skywalking.oap.server.core.storage.model.ModelName; +import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.SelectQueryImpl; +import org.influxdb.querybuilder.SelectSubQueryImpl; + +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq; +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte; +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte; +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; + +@Slf4j +public class AggregationQuery implements IAggregationQueryDAO { +private InfluxClient client; + +public AggregationQuery(InfluxClient client) { +this.client = client; +} + +@Override +public List getServiceTopN(String indName, String valueCName, int topN, Downsampling downsampling, + long startTB, long endTB, Order order) throws IOException { +return getTopNEntity(downsampling, indName, subQuery(indName, valueCName, startTB, endTB), order, topN); +} + +@Override +public List getAllServiceInstanceTopN(String indName, String valueCName, int topN, + Downsampling downsampling, + long startTB, long endTB, Order order) throws IOException { +return getTopNEntity(downsampling, indName, subQuery(indName, valueCName, startTB, endTB), order, topN); +} + +@Override +public List getServiceInstanceTopN(int serviceId, String indName, String valueCName, int topN, + Downsampling downsampling, + long startTB, long endTB, Order order) throws IOException { +return getTopNEntity( +downsampling, indName, +subQuery(ServiceInstanceInventory.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN +); +} + +@Override +public List getAllEndpointTopN(String indName, String valueCName, int topN, Downsampling downsampling, + long startTB, long endTB, Order order) throws IOException { +return getTopNEntity(downsampling, indName, subQuery(indName, valueCName, startTB, endTB), order, topN); +} + +@Override +public List getEndpointTopN(int serviceId, String indName, String valueCName, int topN, +Downsampling downsampling, +long startTB, long endTB, Order order) throws IOException { +return getTopNEntity( +downsampling, indName, +subQuery(EndpointInventory.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN +); +} + +private List getTopNEntity(Downsampling downsampling, +
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r380040883 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/installer/MetaTableDefine.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.installer; + +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo; + +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ENDPOINT_INVENTORY; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NETWORK_ADDRESS; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INVENTORY; + +/** + * Here defines which table is stored in metadata database(H2/MySQL). + */ +public class MetaTableDefine { + +/** + * Test a {@link Model} is stored in H2/MySQL or not. + * + * @param model Model + * @return true if the {@link Model} is stored in H2/MySQL + */ +public static boolean contains(Model model) { +switch (model.getScopeId()) { +case SERVICE_INVENTORY: +case SERVICE_INSTANCE_INVENTORY: +case NETWORK_ADDRESS: +case ENDPOINT_INVENTORY: +case PROFILE_TASK: Review comment: There is another missing here, `PROFILE_TASK_LOG`. Could you check why this is missed but still tests passed? Does InfluxDB have the profile e2e? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r379903387 ## File path: .github/workflows/e2e.yaml ## @@ -71,15 +73,19 @@ jobs: ./mvnw --batch-mode -Dcheckstyle.skip -Drat.skip -T2 -Dmaven.compile.fork -Dmaven.compiler.maxmem=3072 -DskipTests clean install ./mvnw --batch-mode -f test/e2e/pom.xml -pl e2e-base clean install - name: Cluster Tests (ES6/ZK/JDK8) -run: export E2E_VERSION=jdk8-1.3 && bash -x test/e2e/run.sh e2e-cluster/e2e-cluster-test-runner +run: export E2E_VERSION=jdk8-1.5 && bash -x test/e2e/run.sh e2e-cluster/e2e-cluster-test-runner --storage=elasticsearch + - name: Cluster Tests (InfluxDB/ZK/JDK8) +run: export E2E_VERSION=jdk8-1.5 && bash -x test/e2e/run.sh e2e-cluster/e2e-cluster-test-runner --storage=influxdb - name: Cluster With Gateway Tests (ES6/ZK/JDK8) -run: export E2E_VERSION=jdk8-1.3 && bash -x test/e2e/run.sh e2e-cluster-with-gateway/e2e-cluster-with-gateway-test-runner +run: export E2E_VERSION=jdk8-1.5 && bash -x test/e2e/run.sh e2e-cluster-with-gateway/e2e-cluster-with-gateway-test-runner Review comment: I am OK with keeping the current status. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r379900290 ## File path: .github/workflows/e2e.yaml ## @@ -71,15 +73,19 @@ jobs: ./mvnw --batch-mode -Dcheckstyle.skip -Drat.skip -T2 -Dmaven.compile.fork -Dmaven.compiler.maxmem=3072 -DskipTests clean install ./mvnw --batch-mode -f test/e2e/pom.xml -pl e2e-base clean install - name: Cluster Tests (ES6/ZK/JDK8) -run: export E2E_VERSION=jdk8-1.3 && bash -x test/e2e/run.sh e2e-cluster/e2e-cluster-test-runner +run: export E2E_VERSION=jdk8-1.5 && bash -x test/e2e/run.sh e2e-cluster/e2e-cluster-test-runner --storage=elasticsearch + - name: Cluster Tests (InfluxDB/ZK/JDK8) +run: export E2E_VERSION=jdk8-1.5 && bash -x test/e2e/run.sh e2e-cluster/e2e-cluster-test-runner --storage=influxdb - name: Cluster With Gateway Tests (ES6/ZK/JDK8) -run: export E2E_VERSION=jdk8-1.3 && bash -x test/e2e/run.sh e2e-cluster-with-gateway/e2e-cluster-with-gateway-test-runner +run: export E2E_VERSION=jdk8-1.5 && bash -x test/e2e/run.sh e2e-cluster-with-gateway/e2e-cluster-with-gateway-test-runner Review comment: Why this doesn't have `--storage=elasticsearch`? What is the default? Or what happens if no `--storage=`? I expect a fail when missing this parameter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r378846947 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import okhttp3.OkHttpClient; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.library.client.Client; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.time.TimeInterval; + +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.ti; + +/** + * InfluxDB connection maintainer, provides base data write/query API. + */ +@Slf4j +public class InfluxClient implements Client { +private InfluxStorageConfig config; +private InfluxDB influx; + +/** + * A constant, the name of time field in Time-series database. + */ +public static final String TIME = "time"; +/** + * A constant, the name of tag. + */ +public static final String TAG_ENTITY_ID = "entity_id"; + +private final String database; + +public InfluxClient(InfluxStorageConfig config) { +this.config = config; +this.database = config.getDatabase(); +} + +public final String getDatabase() { +return database; +} + +@Override +public void connect() { +influx = InfluxDBFactory.connect(config.getUrl(), config.getUser(), config.getPassword(), + new OkHttpClient.Builder().readTimeout(3, TimeUnit.MINUTES) + .writeTimeout(3, TimeUnit.MINUTES), + InfluxDB.ResponseFormat.MSGPACK +); +influx.query(new Query("CREATE DATABASE " + database)); + +influx.enableBatch(config.getActions(), config.getDuration(), TimeUnit.MILLISECONDS); +influx.setDatabase(database); +} + +/** + * To get a connection of InfluxDB. + * + * @return InfluxDB's connection + */ +private InfluxDB getInflux() { +return influx; +} + +/** + * Execute a query against InfluxDB and return a set of {@link QueryResult.Result}s. Normally, InfluxDB supports + * combining multiple statements into one query, so that we do get multi-results. + * + * @param query Query + * @return a set of {@link QueryResult.Result}s. + * @throws IOException if there is an error on the InfluxDB server or communication error. + */ +public List query(Query query) throws IOException { +if (log.isDebugEnabled()) { +log.debug("SQL Statement: {}", query.getCommand()); +} + +try { +QueryResult result = getInflux().query(query); +if (result.hasError()) { +throw new IOException(result.getError()); +} +return result.getResults(); +} catch (Exception e) { +throw new IOException(e.getMessage() + System.lineSeparator() + "SQL Statement: " + query.getCommand(), e); +} +} + +/** + * Execute a query against InfluxDB with a single statement. + * + * @param query Query + * @return a set of {@link QueryResult.Series}s + * @throws IOException if there is an error on the InfluxDB server or communication error + */ +public List queryForSeries(Query query) throws IOException { +List results = query(query); + +if (CollectionUtils.isEmpty(results)) { +
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r378850400 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/PointBuilder.java ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger; +import org.apache.skywalking.oap.server.core.alarm.AlarmRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.analysis.topn.TopN; +import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord; +import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord; +import org.apache.skywalking.oap.server.core.storage.StorageData; +import org.apache.skywalking.oap.server.core.storage.model.ColumnName; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; +import org.apache.skywalking.oap.server.core.storage.type.StorageDataType; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.influxdb.dto.Point; + +import static org.apache.skywalking.oap.server.core.analysis.TimeBucket.getTimestamp; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ALARM; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.DATABASE_SLOW_STATEMENT; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.HTTP_ACCESS_LOG; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_LOG; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT; +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SEGMENT; + +/** + * A helper help to build a InfluxDB Point from StorageData. + */ +public class PointBuilder { Review comment: Could you explain this class a little more? I am a little confused. All existing entities have the shared `prepareBatchInsert` and `prepareBatchUpdate` logic without explicit entity name required. Using the literal string name here is highly unstable. These names could be changed in any PR, and new entity could be added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r378012577 ## File path: .github/workflows/e2e.yaml ## @@ -39,19 +39,21 @@ jobs: ./mvnw --batch-mode -Dcheckstyle.skip -Drat.skip -T2 -Dmaven.compile.fork -Dmaven.compiler.maxmem=3072 -DskipTests clean install ./mvnw --batch-mode -f test/e2e/pom.xml -pl e2e-base clean install - name: Single Node Tests(JDK8) -run: export E2E_VERSION=jdk8-1.3 && bash -x test/e2e/run.sh e2e-single-service +run: export E2E_VERSION=jdk8-1.4 && bash -x test/e2e/run.sh e2e-single-service Review comment: Upgrade to 1.5 please, based on, https://github.com/apache/skywalking/issues/4343#issuecomment-584990302 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r378009922 ## File path: .github/workflows/e2e.yaml ## @@ -39,19 +39,21 @@ jobs: ./mvnw --batch-mode -Dcheckstyle.skip -Drat.skip -T2 -Dmaven.compile.fork -Dmaven.compiler.maxmem=3072 -DskipTests clean install ./mvnw --batch-mode -f test/e2e/pom.xml -pl e2e-base clean install - name: Single Node Tests(JDK8) -run: export E2E_VERSION=jdk8-1.3 && bash -x test/e2e/run.sh e2e-single-service +run: export E2E_VERSION=jdk8-1.4 && bash -x test/e2e/run.sh e2e-single-service + - name: Single Node Tests(InfluxDB/JDK8) +run: export E2E_VERSION=jdk8-1.4 && bash -x test/e2e/run.sh e2e-influxdb - name: Single Node Tests(MySQL/JDK8) -run: export E2E_VERSION=jdk8-1.3 && bash -x test/e2e/run.sh e2e-mysql +run: export E2E_VERSION=jdk8-1.4 && bash -x test/e2e/run.sh e2e-mysql - name: Single Node Tests(JDK9) -run: export E2E_VERSION=jdk9-1.3 && bash -x test/e2e/run.sh e2e-single-service +run: export E2E_VERSION=jdk9-1.4 && bash -x test/e2e/run.sh e2e-single-service - name: Single Node Tests(JDK11) -run: export E2E_VERSION=jdk11-1.3 && bash -x test/e2e/run.sh e2e-single-service +run: export E2E_VERSION=jdk11-1.4 && bash -x test/e2e/run.sh e2e-single-service - name: Single Node Tests(JDK12) -run: export E2E_VERSION=jdk12-1.3 && bash -x test/e2e/run.sh e2e-single-service +run: export E2E_VERSION=jdk12-1.4 && bash -x test/e2e/run.sh e2e-single-service - name: Agent Reboot Tests(JDK8) -run: export E2E_VERSION=jdk8-1.3 && bash -x test/e2e/run.sh e2e-agent-reboot +run: export E2E_VERSION=jdk8-1.4 && bash -x test/e2e/run.sh e2e-agent-reboot - Cluster: + Cluster_N_TTL: Review comment: Three things 1. N represent AND is not a common case. `+` and `&` are better 1. This name should not be changed randomly, `Cluster` is a required test, without this check(named as `cluster`), we can't merge PR unless we request the INFRA change the settings. 1. If you want the test group names more clear, moving the TTL out of the cluster test group makes sense to me. But notice, it will take more time as it requires to compile the project again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r378004330 ## File path: .github/workflows/e2e.yaml ## @@ -39,19 +39,21 @@ jobs: ./mvnw --batch-mode -Dcheckstyle.skip -Drat.skip -T2 -Dmaven.compile.fork -Dmaven.compiler.maxmem=3072 -DskipTests clean install ./mvnw --batch-mode -f test/e2e/pom.xml -pl e2e-base clean install - name: Single Node Tests(JDK8) -run: export E2E_VERSION=jdk8-1.3 && bash -x test/e2e/run.sh e2e-single-service +run: export E2E_VERSION=jdk8-1.4 && bash -x test/e2e/run.sh e2e-single-service + - name: Single Node Tests(InfluxDB/JDK8) +run: export E2E_VERSION=jdk8-1.4 && bash -x test/e2e/run.sh e2e-influxdb - name: Single Node Tests(MySQL/JDK8) -run: export E2E_VERSION=jdk8-1.3 && bash -x test/e2e/run.sh e2e-mysql +run: export E2E_VERSION=jdk8-1.4 && bash -x test/e2e/run.sh e2e-mysql - name: Single Node Tests(JDK9) -run: export E2E_VERSION=jdk9-1.3 && bash -x test/e2e/run.sh e2e-single-service +run: export E2E_VERSION=jdk9-1.4 && bash -x test/e2e/run.sh e2e-single-service - name: Single Node Tests(JDK11) -run: export E2E_VERSION=jdk11-1.3 && bash -x test/e2e/run.sh e2e-single-service +run: export E2E_VERSION=jdk11-1.4 && bash -x test/e2e/run.sh e2e-single-service - name: Single Node Tests(JDK12) -run: export E2E_VERSION=jdk12-1.3 && bash -x test/e2e/run.sh e2e-single-service +run: export E2E_VERSION=jdk12-1.4 && bash -x test/e2e/run.sh e2e-single-service - name: Agent Reboot Tests(JDK8) -run: export E2E_VERSION=jdk8-1.3 && bash -x test/e2e/run.sh e2e-agent-reboot +run: export E2E_VERSION=jdk8-1.4 && bash -x test/e2e/run.sh e2e-agent-reboot - Cluster: + Cluster_N_TTL: Review comment: What do you mean N_TTL? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r376746175 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import okhttp3.OkHttpClient; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.library.client.Client; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.time.TimeInterval; + +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.ti; + +/** + * InfluxDB connection maintainer, provides base data write/query API. + */ +@Slf4j +public class InfluxClient implements Client { +private InfluxStorageConfig config; +private InfluxDB influx; + +/** + * A constant, the name of time field in Time-series database. + */ +public static final String TIME = "time"; +/** + * A constant, the name of tag. + */ +public static final String TAG_ENTITY_ID = "entity_id"; + +private final String database; + +public InfluxClient(InfluxStorageConfig config) { +this.config = config; +this.database = config.getDatabase(); +} + +public final String getDatabase() { +return database; +} + +@Override +public void connect() { +influx = InfluxDBFactory.connect(config.getUrl(), config.getUser(), config.getPassword(), +new OkHttpClient.Builder(), InfluxDB.ResponseFormat.MSGPACK); +influx.query(new Query("CREATE DATABASE " + database)); + +influx.enableBatch(config.getActions(), config.getDuration(), TimeUnit.MILLISECONDS); +influx.setDatabase(database); +} + +/** + * To get a connection of InfluxDB. + * + * @return InfluxDB's connection + */ +public InfluxDB getInflux() { Review comment: I think this should be `private` to avoid misuse This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r376711649 ## File path: test/e2e/e2e-ttl/e2e-ttl-influxdb/src/main/proto/common/CLR.proto ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +syntax = "proto3"; Review comment: All these protos should be removed, please follow this, https://github.com/apache/skywalking/pull/4329#issue-372713174 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r376689951 ## File path: docs/en/setup/backend/backend-storage.md ## @@ -226,6 +227,27 @@ storage: All connection related settings including link url, username and password are in `application.yml`. These settings can refer to the configuration of *MySQL* above. +## InfluxDB +InfluxDB as storage since SkyWalking 7.0. It depends on `H2/MySQL` storage-plugin to store `metadata` likes `Inventory` and `ProfileTask`. So, when we set `InfluxDB` as storage provider, we need to configure `InfluxDB`'s properties and `H2/MySQL`. Review comment: `likes` -> `like`. `we need to configure InfluxDB's properties and H2/MySQL` -> `We need to configure properties of InfluxDB and Metabase. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r376689989 ## File path: docs/en/setup/backend/backend-storage.md ## @@ -226,6 +227,27 @@ storage: All connection related settings including link url, username and password are in `application.yml`. These settings can refer to the configuration of *MySQL* above. +## InfluxDB +InfluxDB as storage since SkyWalking 7.0. It depends on `H2/MySQL` storage-plugin to store `metadata` likes `Inventory` and `ProfileTask`. So, when we set `InfluxDB` as storage provider, we need to configure `InfluxDB`'s properties and `H2/MySQL`. + +```yaml +storage + influx: +# Metadata storage provider configuration +metabaseType: ${SW_STORAGE_METABASE_TYPE:H2} Review comment: Please provide type options. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r376276636 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; + +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; +import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool; +import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory; +import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; +import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.storage.IBatchDAO; +import org.apache.skywalking.oap.server.library.client.request.InsertRequest; +import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.influxdb.dto.BatchPoints; + +@Slf4j +public class BatchDAO implements IBatchDAO { +private final DataCarrier dataCarrier; +private final InfluxClient client; + +public BatchDAO(InfluxClient client) { +this.client = client; + +String name = "INFLUX_ASYNC_BATCH_PERSISTENT"; +BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20L); + +try { +ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator); +} catch (Exception e) { +throw new UnexpectedException(e.getMessage(), e); +} + +this.dataCarrier = new DataCarrier(1, 1); +this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new InfluxBatchConsumer(this)); +} + +@Override +public void asynchronous(InsertRequest insertRequest) { +dataCarrier.produce(insertRequest); Review comment: Then you should not require datacarrier? Right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r375229729 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.List; +import okhttp3.OkHttpClient; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.library.client.Client; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.time.TimeInterval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.ti; + +/** + * + */ +public class InfluxClient implements Client { +private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); +private InfluxStorageConfig config; +private InfluxDB influx; + +/** + * A constant, the name of time field in Time-series database. + */ +public static final String TIME = "time"; +/** + * A constant, the name of tag. + */ +public static final String TAG_ENTITY_ID = "entity_id"; + +private final String database; + +public InfluxClient(InfluxStorageConfig config) { +this.config = config; +this.database = config.getDatabase(); +} + +public final String getDatabase() { +return database; +} + +@Override +public void connect() { +influx = InfluxDBFactory.connect(config.getUrl(), config.getUser(), config.getPassword(), +new OkHttpClient.Builder(), InfluxDB.ResponseFormat.MSGPACK); +influx.query(new Query("CREATE DATABASE " + database)); + +influx.setDatabase(database); +influx.enableBatch(); +} + +/** + * To get a connection of InfluxDB + * + * @return InfluxDB's connection + */ +public InfluxDB getInflux() { +return influx; +} + +/** + * Request with a {@link Query} to InfluxDB and return a set of {@link QueryResult.Result}s. + * + * @param query + * @return a set of Result. + * @throws IOException + */ +public List query(Query query) throws IOException { +if (logger.isDebugEnabled()) { +logger.debug("SQL Statement: {}", query.getCommand()); +} + +try { +QueryResult result = getInflux().query(query); +if (result.hasError()) { +throw new IOException(result.getError()); +} +return result.getResults(); +} catch (Exception e) { +throw new IOException(e.getMessage() + System.lineSeparator() + "SQL Statement: " + query.getCommand(), e); +} +} + +/** + * Request with one statement to InfluxDB and return a set of {@link QueryResult.Series}s. + * + * @param query + * @return a set of Series + * @throws IOException + */ +public List queryForSeries(Query query) throws IOException { +return query(query).get(0).getSeries(); Review comment: Could we add a `#singleStatementQuery` method for most cases? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r375131166 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.List; +import okhttp3.OkHttpClient; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.library.client.Client; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.time.TimeInterval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.ti; + +/** + * + */ +public class InfluxClient implements Client { +private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); +private InfluxStorageConfig config; +private InfluxDB influx; + +/** + * A constant, the name of time field in Time-series database. + */ +public static final String TIME = "time"; +/** + * A constant, the name of tag. + */ +public static final String TAG_ENTITY_ID = "entity_id"; + +private final String database; + +public InfluxClient(InfluxStorageConfig config) { +this.config = config; +this.database = config.getDatabase(); +} + +public final String getDatabase() { +return database; +} + +@Override +public void connect() { +influx = InfluxDBFactory.connect(config.getUrl(), config.getUser(), config.getPassword(), +new OkHttpClient.Builder(), InfluxDB.ResponseFormat.MSGPACK); +influx.query(new Query("CREATE DATABASE " + database)); + +influx.setDatabase(database); +influx.enableBatch(); +} + +/** + * To get a connection of InfluxDB + * + * @return InfluxDB's connection + */ +public InfluxDB getInflux() { +return influx; +} + +/** + * Request with a {@link Query} to InfluxDB and return a set of {@link QueryResult.Result}s. + * + * @param query + * @return a set of Result. + * @throws IOException + */ +public List query(Query query) throws IOException { +if (logger.isDebugEnabled()) { +logger.debug("SQL Statement: {}", query.getCommand()); +} + +try { +QueryResult result = getInflux().query(query); +if (result.hasError()) { +throw new IOException(result.getError()); +} +return result.getResults(); +} catch (Exception e) { +throw new IOException(e.getMessage() + System.lineSeparator() + "SQL Statement: " + query.getCommand(), e); +} +} + +/** + * Request with one statement to InfluxDB and return a set of {@link QueryResult.Series}s. + * + * @param query + * @return a set of Series + * @throws IOException + */ +public List queryForSeries(Query query) throws IOException { +return query(query).get(0).getSeries(); Review comment: Do we use multiple statements? I think `#get(0)` should be included `#query`(return series). If we have multiple stat case, we should have a different API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r375122894 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; +import org.apache.skywalking.oap.server.core.storage.type.StorageDataType; +import org.apache.skywalking.oap.server.library.client.request.InsertRequest; +import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.SelectQueryImpl; +import org.influxdb.querybuilder.WhereQueryImpl; + +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains; +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; + +public class MetricsDAO implements IMetricsDAO { +private final StorageBuilder storageBuilder; +private final InfluxClient client; + +public MetricsDAO(InfluxClient client, StorageBuilder storageBuilder) { +this.client = client; +this.storageBuilder = storageBuilder; +} + +@Override +public List multiGet(Model model, List ids) throws IOException { +WhereQueryImpl query = select("*::field") +.from(client.getDatabase(), model.getName()) +.where(contains("id", Joiner.on("|").join(ids))); +List series = client.queryForSeries(query); +if (series == null || series.isEmpty()) { +return Collections.emptyList(); +} + +final List metrics = Lists.newArrayList(); +List columns = series.get(0).getColumns(); +Map storageAndColumnNames = Maps.newHashMap(); +for (ModelColumn column : model.getColumns()) { +storageAndColumnNames.put(column.getColumnName().getName(), column.getColumnName().getStorageName()); +} + +series.get(0).getValues().forEach(values -> { Review comment: Another `#get(0)`. What does this mean? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r375121308 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; + +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; +import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool; +import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory; +import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; +import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.storage.IBatchDAO; +import org.apache.skywalking.oap.server.library.client.request.InsertRequest; +import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.influxdb.dto.BatchPoints; + +@Slf4j +public class BatchDAO implements IBatchDAO { +private final DataCarrier dataCarrier; +private final InfluxClient client; + +public BatchDAO(InfluxClient client) { +this.client = client; + +String name = "INFLUX_ASYNC_BATCH_PERSISTENT"; +BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20L); + +try { +ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator); +} catch (Exception e) { +throw new UnexpectedException(e.getMessage(), e); +} + +this.dataCarrier = new DataCarrier(1, 1); +this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new InfluxBatchConsumer(this)); +} + +@Override +public void asynchronous(InsertRequest insertRequest) { +dataCarrier.produce(insertRequest); +} + +@Override +public void synchronous(List prepareRequests) { +if (CollectionUtils.isEmpty(prepareRequests)) { +return; +} + +if (log.isDebugEnabled()) { +log.debug("batch sql statements execute, data size: {}", prepareRequests.size()); +} + +final BatchPoints.Builder builder = BatchPoints.builder(); +prepareRequests.forEach(e -> { +builder.point(((InfluxInsertRequest)e).getPoint()); +}); + +client.write(builder.build()); Review comment: Is this a blocking write? Meaning, the data is queryable when this method finished. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r375122025 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; + +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; +import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool; +import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory; +import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; +import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.storage.IBatchDAO; +import org.apache.skywalking.oap.server.library.client.request.InsertRequest; +import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.influxdb.dto.BatchPoints; + +@Slf4j +public class BatchDAO implements IBatchDAO { +private final DataCarrier dataCarrier; +private final InfluxClient client; + +public BatchDAO(InfluxClient client) { +this.client = client; + +String name = "INFLUX_ASYNC_BATCH_PERSISTENT"; +BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20L); + +try { +ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator); +} catch (Exception e) { +throw new UnexpectedException(e.getMessage(), e); +} + +this.dataCarrier = new DataCarrier(1, 1); +this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new InfluxBatchConsumer(this)); +} + +@Override +public void asynchronous(InsertRequest insertRequest) { +dataCarrier.produce(insertRequest); Review comment: Doesn't InfluxDB have an async write mode? I remember time serious DB is good at writing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r375120216 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageConfig.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb; + +import lombok.Getter; +import lombok.Setter; +import org.apache.skywalking.oap.server.library.module.ModuleConfig; + +@Setter +@Getter +public class InfluxStorageConfig extends ModuleConfig { Review comment: Comments are required. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r375117861 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.List; +import okhttp3.OkHttpClient; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.library.client.Client; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.time.TimeInterval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.ti; + +/** + * Review comment: Empty comment? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r375122463 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.List; +import okhttp3.OkHttpClient; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.library.client.Client; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.time.TimeInterval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.ti; + +/** + * + */ +public class InfluxClient implements Client { +private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); +private InfluxStorageConfig config; +private InfluxDB influx; + +/** + * A constant, the name of time field in Time-series database. + */ +public static final String TIME = "time"; +/** + * A constant, the name of tag. + */ +public static final String TAG_ENTITY_ID = "entity_id"; + +private final String database; + +public InfluxClient(InfluxStorageConfig config) { +this.config = config; +this.database = config.getDatabase(); +} + +public final String getDatabase() { +return database; +} + +@Override +public void connect() { +influx = InfluxDBFactory.connect(config.getUrl(), config.getUser(), config.getPassword(), +new OkHttpClient.Builder(), InfluxDB.ResponseFormat.MSGPACK); +influx.query(new Query("CREATE DATABASE " + database)); + +influx.setDatabase(database); +influx.enableBatch(); +} + +/** + * To get a connection of InfluxDB + * + * @return InfluxDB's connection + */ +public InfluxDB getInflux() { +return influx; +} + +/** + * Request with a {@link Query} to InfluxDB and return a set of {@link QueryResult.Result}s. + * + * @param query + * @return a set of Result. + * @throws IOException + */ +public List query(Query query) throws IOException { +if (logger.isDebugEnabled()) { +logger.debug("SQL Statement: {}", query.getCommand()); +} + +try { +QueryResult result = getInflux().query(query); +if (result.hasError()) { +throw new IOException(result.getError()); +} +return result.getResults(); +} catch (Exception e) { +throw new IOException(e.getMessage() + System.lineSeparator() + "SQL Statement: " + query.getCommand(), e); +} +} + +/** + * Request with one statement to InfluxDB and return a set of {@link QueryResult.Series}s. + * + * @param query + * @return a set of Series + * @throws IOException + */ +public List queryForSeries(Query query) throws IOException { +return query(query).get(0).getSeries(); +} + +/** + * Data management, to drop a time-series by measurement and time-series name specified. If an exception isn't + * thrown, it means execution success. + * + * @param measurement + * @param timeBucket + * @throws IOException + */ +public void dropSeries(String measurement, long timeBucket) throws IOException { +Query query = new Query("DROP SERIES FROM " + measurement + " WHERE time_bucket='" + timeBucket + "'"); Review comment: `=`? This is an automated message from the Apache Git
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r375118648 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.List; +import okhttp3.OkHttpClient; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.library.client.Client; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.time.TimeInterval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.ti; + +/** + * + */ +public class InfluxClient implements Client { +private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); +private InfluxStorageConfig config; +private InfluxDB influx; + +/** + * A constant, the name of time field in Time-series database. + */ +public static final String TIME = "time"; +/** + * A constant, the name of tag. + */ +public static final String TAG_ENTITY_ID = "entity_id"; + +private final String database; + +public InfluxClient(InfluxStorageConfig config) { +this.config = config; +this.database = config.getDatabase(); +} + +public final String getDatabase() { +return database; +} + +@Override +public void connect() { +influx = InfluxDBFactory.connect(config.getUrl(), config.getUser(), config.getPassword(), +new OkHttpClient.Builder(), InfluxDB.ResponseFormat.MSGPACK); +influx.query(new Query("CREATE DATABASE " + database)); + +influx.setDatabase(database); +influx.enableBatch(); +} + +/** + * To get a connection of InfluxDB + * + * @return InfluxDB's connection + */ +public InfluxDB getInflux() { +return influx; +} + +/** + * Request with a {@link Query} to InfluxDB and return a set of {@link QueryResult.Result}s. + * + * @param query + * @return a set of Result. + * @throws IOException + */ +public List query(Query query) throws IOException { +if (logger.isDebugEnabled()) { +logger.debug("SQL Statement: {}", query.getCommand()); +} + +try { +QueryResult result = getInflux().query(query); +if (result.hasError()) { +throw new IOException(result.getError()); +} +return result.getResults(); +} catch (Exception e) { +throw new IOException(e.getMessage() + System.lineSeparator() + "SQL Statement: " + query.getCommand(), e); +} +} + +/** + * Request with one statement to InfluxDB and return a set of {@link QueryResult.Series}s. + * + * @param query + * @return a set of Series + * @throws IOException + */ +public List queryForSeries(Query query) throws IOException { +return query(query).get(0).getSeries(); Review comment: What does this `#get(0)` represent? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r375119872 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.List; +import okhttp3.OkHttpClient; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.library.client.Client; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.time.TimeInterval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.ti; + +/** + * + */ +public class InfluxClient implements Client { +private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); +private InfluxStorageConfig config; +private InfluxDB influx; + +/** + * A constant, the name of time field in Time-series database. + */ +public static final String TIME = "time"; +/** + * A constant, the name of tag. + */ +public static final String TAG_ENTITY_ID = "entity_id"; + +private final String database; + +public InfluxClient(InfluxStorageConfig config) { +this.config = config; +this.database = config.getDatabase(); +} + +public final String getDatabase() { +return database; +} + +@Override +public void connect() { +influx = InfluxDBFactory.connect(config.getUrl(), config.getUser(), config.getPassword(), +new OkHttpClient.Builder(), InfluxDB.ResponseFormat.MSGPACK); +influx.query(new Query("CREATE DATABASE " + database)); + +influx.setDatabase(database); +influx.enableBatch(); +} + +/** + * To get a connection of InfluxDB + * + * @return InfluxDB's connection + */ +public InfluxDB getInflux() { +return influx; +} + +/** + * Request with a {@link Query} to InfluxDB and return a set of {@link QueryResult.Result}s. + * + * @param query + * @return a set of Result. + * @throws IOException + */ +public List query(Query query) throws IOException { +if (logger.isDebugEnabled()) { +logger.debug("SQL Statement: {}", query.getCommand()); +} + +try { +QueryResult result = getInflux().query(query); +if (result.hasError()) { +throw new IOException(result.getError()); +} +return result.getResults(); +} catch (Exception e) { +throw new IOException(e.getMessage() + System.lineSeparator() + "SQL Statement: " + query.getCommand(), e); +} +} + +/** + * Request with one statement to InfluxDB and return a set of {@link QueryResult.Series}s. + * + * @param query + * @return a set of Series + * @throws IOException + */ +public List queryForSeries(Query query) throws IOException { +return query(query).get(0).getSeries(); +} + +/** + * Data management, to drop a time-series by measurement and time-series name specified. If an exception isn't + * thrown, it means execution success. + * + * @param measurement + * @param timeBucket + * @throws IOException + */ +public void dropSeries(String measurement, long timeBucket) throws IOException { +Query query = new Query("DROP SERIES FROM " + measurement + " WHERE time_bucket='" + timeBucket + "'"); +QueryResult result = getInflux().query(query); + +if (result.hasError()) { +throw new IOException("Statement: "
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r375072105 ## File path: oap-server/server-bootstrap/src/main/resources/application.yml ## @@ -75,29 +75,29 @@ core: enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true} topNReportPeriod: ${SW_CORE_TOPN_REPORT_PERIOD:10} # top_n record worker report cycle, unit is minute storage: -# elasticsearch: -#nameSpace: ${SW_NAMESPACE:""} -#clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200} -#protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"} -##trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"} -##trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""} -#user: ${SW_ES_USER:""} -#password: ${SW_ES_PASSWORD:""} -#indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2} -#indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0} -## Those data TTL settings will override the same settings in core module. -#recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day -#otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day -#monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month -## Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html -#bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests -#flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests -#concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests -#resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:1} -#metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000} -#segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200} -#profileTaskQueryMaxSize: ${SW_STORAGE_ES_QUERY_PROFILE_TASK_SIZE:200} -#advanced: ${SW_STORAGE_ES_ADVANCED:""} + # elasticsearch: + #nameSpace: ${SW_NAMESPACE:""} + #clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200} + #protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"} + ##trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"} + ##trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""} + #user: ${SW_ES_USER:""} + #password: ${SW_ES_PASSWORD:""} + #indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2} + #indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0} + ## Those data TTL settings will override the same settings in core module. + #recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day + #otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day + #monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month + ## Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html + #bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests + #flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests + #concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests + #resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:1} + #metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000} + #segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200} + #profileTaskQueryMaxSize: ${SW_STORAGE_ES_QUERY_PROFILE_TASK_SIZE:200} + #advanced: ${SW_STORAGE_ES_ADVANCED:""} Review comment: These formats are still changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r374739055 ## File path: tools/dependencies/known-oap-backend-dependencies-es7.txt ## @@ -159,3 +159,8 @@ sundr-core-0.9.2.jar swagger-annotations-1.5.12.jar t-digest-3.2.jar zookeeper-3.4.10.jar +converter-moshi-2.5.0.jar +influxdb-java-2.15.jar +logging-interceptor-3.13.1.jar +moshi-1.5.0.jar +msgpack-core-0.8.16.jar Review comment: Confused what parts? All new dependency should be added into the LICENSE, like you do for `influxdb-java`. You need to find the licenses for all other jars. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r374654305 ## File path: oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java ## @@ -32,8 +35,13 @@ public static final String TIME_BUCKET = "time_bucket"; public static final String ENTITY_ID = "entity_id"; -@Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket; -@Getter @Setter private long survivalTime = 0L; +@Getter +@Setter +@Column(columnName = TIME_BUCKET) +private long timeBucket; +@Getter +@Setter +private long survivalTime = 0L; Review comment: Including all following changes of this class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r374654620 ## File path: oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java ## @@ -35,6 +38,7 @@ private final List columns; private final int scopeId; private final boolean record; +private final TreeMap storageColumns; Review comment: This should not be added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r374654869 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; + +import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; +import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool; +import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory; +import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; +import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.storage.IBatchDAO; +import org.apache.skywalking.oap.server.library.client.request.InsertRequest; +import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.influxdb.dto.BatchPoints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class BatchDAO implements IBatchDAO { Review comment: This file format seems not right. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r374642128 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTTLCalculatorProvider.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.skywalking.oap.server.storage.plugin.influxdb; + +import org.apache.skywalking.oap.server.core.DataTTLConfig; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; +import org.joda.time.DateTime; + +public class InfluxTTLCalculatorProvider { Review comment: There is `GeneralStorageTTL` existing, the influxdb implementation should be as same as that one, since it also uses the core configuration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r374659028 ## File path: oap-server/server-bootstrap/src/main/resources/application.yml ## @@ -98,33 +98,33 @@ storage: #segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200} #profileTaskQueryMaxSize: ${SW_STORAGE_ES_QUERY_PROFILE_TASK_SIZE:200} #advanced: ${SW_STORAGE_ES_ADVANCED:""} - elasticsearch7: -nameSpace: ${SW_NAMESPACE:""} -clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200} -protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"} -#trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"} -#trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""} -user: ${SW_ES_USER:""} -password: ${SW_ES_PASSWORD:""} -indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2} -indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0} -# Those data TTL settings will override the same settings in core module. -recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day -otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day -monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month -# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html -bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests -flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests -concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests -resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:1} -metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000} -segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200} -advanced: ${SW_STORAGE_ES_ADVANCED:""} -# h2: -#driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource} -#url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db} -#user: ${SW_STORAGE_H2_USER:sa} -#metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000} +# elasticsearch7: +#nameSpace: ${SW_NAMESPACE:""} +#clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200} +#protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"} +##trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"} +##trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""} +#user: ${SW_ES_USER:""} +#password: ${SW_ES_PASSWORD:""} +#indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2} +#indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0} +## Those data TTL settings will override the same settings in core module. +#recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day +#otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day +#monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month +## Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html +#bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests +#flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests +#concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests +#resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:1} +#metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000} +#segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200} +#advanced: ${SW_STORAGE_ES_ADVANCED:""} + h2: +driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource} +url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db} +user: ${SW_STORAGE_H2_USER:sa} +metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000} Review comment: This shouldn't be changed. If start in IDE, we keep in ES This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r374653664 ## File path: oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java ## @@ -32,8 +35,13 @@ public static final String TIME_BUCKET = "time_bucket"; public static final String ENTITY_ID = "entity_id"; -@Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket; -@Getter @Setter private long survivalTime = 0L; +@Getter +@Setter +@Column(columnName = TIME_BUCKET) +private long timeBucket; +@Getter +@Setter +private long survivalTime = 0L; Review comment: This class should not change and not relate to this PR, please revert. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r374641089 ## File path: tools/dependencies/known-oap-backend-dependencies-es7.txt ## @@ -159,3 +159,8 @@ sundr-core-0.9.2.jar swagger-annotations-1.5.12.jar t-digest-3.2.jar zookeeper-3.4.10.jar +converter-moshi-2.5.0.jar +influxdb-java-2.15.jar +logging-interceptor-3.13.1.jar +moshi-1.5.0.jar +msgpack-core-0.8.16.jar Review comment: All these dependencies licenses should be checked and LICENSE file should be updated(NOTICE should be update if the license is Apache 2.0 and there is a NOTICE existing). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [skywalking] wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin
wu-sheng commented on a change in pull request #4239: Provide influxdb as a new storage plugin URL: https://github.com/apache/skywalking/pull/4239#discussion_r374638027 ## File path: oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java ## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.List; +import okhttp3.OkHttpClient; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.library.client.Client; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.time.TimeInterval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.ti; + +public class InfluxClient implements Client { +private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); Review comment: Agree. `@slf4j` good enough for me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services