diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..640b910 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +# +.yardoc +Gemfile.lock +FileList +.scannerwork +.vscode diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2b2ccd8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM centos:7 + +MAINTAINER 12ww1160 + +# Build the binary +RUN make +RUN make container + +COPY start.sh / + +ENTRYPOINT ["/start.sh"] diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000..8f2c91b --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,124 @@ +pipeline { + agent { + label 'general' + } + + post { + always { + deleteDir() /* clean up our workspace */ + } + success { + echo 'Pipeline finished successfully!' + updateGitlabCommitStatus name: 'build', state: 'success' + } + failure { + echo 'I failed :(' + step([$class: 'Mailer', notifyEveryUnstableBuild: true, recipients: 'support@confdroid.com', sendToIndividuals: true]) + updateGitlabCommitStatus name: 'build', state: 'failed' + } + } + + options { + gitLabConnection('gitlab.confdroid.com') + } + + stages { + + stage('pull master') { + steps { + sshagent(['edd05eb6-26b5-4c7b-a5cc-ea2ab899f4fa']) { + sh ''' + git config user.name "Jenkins Server" + git config user.email jenkins@confdroid.com + # Ensure we're on the development branch (triggered by push) + git checkout development + # Create jenkins branch from development + git checkout -b jenkins-build-$BUILD_NUMBER + # Optionally merge master into jenkins to ensure compatibility + git merge origin/master --no-ff || { echo "Merge conflict detected"; exit 1; } + ''' + } + } + } + + stage('SonarScan') { + steps { + withCredentials([string(credentialsId: 'sonar-token', variable: 'SONAR_TOKEN')]) { + sh ''' + /opt/sonar-scanner/bin/sonar-scanner \ + -Dsonar.projectKey=prometheus-pg-adapter \ + -Dsonar.sources=. \ + -Dsonar.host.url=https://sonarqube.confdroid.com \ + -Dsonar.token=$SONAR_TOKEN + ''' + } + } + } + + stage('build and push image to gitlab') { + steps { + withCredentials([usernamePassword(credentialsId: '864a3edb-9d6e-4ad1-b382-22eeb0ea6b8a', passwordVariable: 'pw', usernameVariable: 'un')]) { + sh ''' + set +xe + docker login gitlab.confdroid.com:5050 -u $un -p $pw + docker build --network=host -t gitlab.confdroid.com:5050/containers/prometheus-pg-adapter:1.0.0 . + docker build --network=host -t gitlab.confdroid.com:5050/containers/prometheus-pg-adapter:latest . + docker push gitlab.confdroid.com:5050/containers/prometheus-pg-adapter:1.0.0 + docker push gitlab.confdroid.com:5050/containers/prometheus-pg-adapter:latest + ''' + } + } + } + + stage('build and push image to gitea') { + steps { + withCredentials([usernamePassword(credentialsId: 'Jenkins-gitea', passwordVariable: 'pw', usernameVariable: 'un')]) { + sh ''' + set +xe + docker login gitea.confdroid.com -u $un -p $pw + docker build --network=host -t gitea.confdroid.com/confdroid/prometheus-pg-adapter:1.0.0 . + docker build --network=host -t gitea.confdroid.com/confdroid/prometheus-pg-adapter:latest . + docker push gitea.confdroid.com/confdroid/prometheus-pg-adapter:1.0.0 + docker push gitea.confdroid.com/confdroid/prometheus-pg-adapter:latest + ''' + } + } + } + + stage('update repo') { + steps { + sshagent(['edd05eb6-26b5-4c7b-a5cc-ea2ab899f4fa']) { + sh ''' + git config user.name "Jenkins Server" + git config user.email jenkins@confdroid.com + git add -A && git commit -am "Recommit for updates in build $BUILD_NUMBER" || echo "No changes to commit" + git push origin HEAD:master + + ''' + } + } + } + + stage('Mirror to Gitea') { + steps { + withCredentials([usernamePassword( + credentialsId: 'Jenkins-gitea', + usernameVariable: 'GITEA_USER', + passwordVariable: 'GITEA_TOKEN')]) { + script { + // Checkout from GitLab (already done implicitly) + sh ''' + git checkout master + git pull origin master + git branch -D development + git branch -D jenkins-build-$BUILD_NUMBER + git remote add master https://gitea.confdroid.com/confdroid/prometheus-pg-adapter.git + git -c credential.helper="!f() { echo username=${GITEA_USER}; echo password=${GITEA_TOKEN}; }; f" \ + push master --mirror + ''' + } + } + } + } + } +} \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..702584d --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2019 Crunchy Data Solutions, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md index 27dfccc..c256af1 100644 --- a/README.md +++ b/README.md @@ -1,93 +1,13 @@ -# prometheus-pg-adapter +# README + +[![Build Status](https://jenkins.confdroid.com/buildStatus/icon?job=prometheus-pg-adapter)](https://jenkins.confdroid.com/job/prometheus-pg-adapter/) + +## Purpose + +This image is entirely based on the [PostgreSQL Prometheus Adapter](https://github.com/CrunchyData/postgresql-prometheus-adapter) from [Chrunchydata](https://3for.me/w5wl2). + +> PostgreSQL Prometheus Adapter is a remote storage adapter designed to utilize PostgreSQL 12 native partitioning enhancements to efficiently store Prometheus time series data in a PostgreSQL database. + +The PostgreSQL Prometheus Adapter design is based on partitioning and threads. Incoming data is processed by one or more threads and one or more writer threads will store data in PostgreSQL daily or hourly partitions. Partitions will be auto-created by the adapter based on the timestamp of incoming data. - -## Getting started - -To make it easy for you to get started with GitLab, here's a list of recommended next steps. - -Already a pro? Just edit this README.md and make it your own. Want to make it easy? [Use the template at the bottom](#editing-this-readme)! - -## Add your files - -- [ ] [Create](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#create-a-file) or [upload](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#upload-a-file) files -- [ ] [Add files using the command line](https://docs.gitlab.com/topics/git/add_files/#add-files-to-a-git-repository) or push an existing Git repository with the following command: - -``` -cd existing_repo -git remote add origin https://gitlab.confdroid.com/containers/prometheus-pg-adapter.git -git branch -M master -git push -uf origin master -``` - -## Integrate with your tools - -- [ ] [Set up project integrations](https://gitlab.confdroid.com/containers/prometheus-pg-adapter/-/settings/integrations) - -## Collaborate with your team - -- [ ] [Invite team members and collaborators](https://docs.gitlab.com/ee/user/project/members/) -- [ ] [Create a new merge request](https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html) -- [ ] [Automatically close issues from merge requests](https://docs.gitlab.com/ee/user/project/issues/managing_issues.html#closing-issues-automatically) -- [ ] [Enable merge request approvals](https://docs.gitlab.com/ee/user/project/merge_requests/approvals/) -- [ ] [Set auto-merge](https://docs.gitlab.com/user/project/merge_requests/auto_merge/) - -## Test and Deploy - -Use the built-in continuous integration in GitLab. - -- [ ] [Get started with GitLab CI/CD](https://docs.gitlab.com/ee/ci/quick_start/) -- [ ] [Analyze your code for known vulnerabilities with Static Application Security Testing (SAST)](https://docs.gitlab.com/ee/user/application_security/sast/) -- [ ] [Deploy to Kubernetes, Amazon EC2, or Amazon ECS using Auto Deploy](https://docs.gitlab.com/ee/topics/autodevops/requirements.html) -- [ ] [Use pull-based deployments for improved Kubernetes management](https://docs.gitlab.com/ee/user/clusters/agent/) -- [ ] [Set up protected environments](https://docs.gitlab.com/ee/ci/environments/protected_environments.html) - -*** - -# Editing this README - -When you're ready to make this README your own, just edit this file and use the handy template below (or feel free to structure it however you want - this is just a starting point!). Thanks to [makeareadme.com](https://www.makeareadme.com/) for this template. - -## Suggestions for a good README - -Every project is different, so consider which of these sections apply to yours. The sections used in the template are suggestions for most open source projects. Also keep in mind that while a README can be too long and detailed, too long is better than too short. If you think your README is too long, consider utilizing another form of documentation rather than cutting out information. - -## Name -Choose a self-explaining name for your project. - -## Description -Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors. - -## Badges -On some READMEs, you may see small images that convey metadata, such as whether or not all the tests are passing for the project. You can use Shields to add some to your README. Many services also have instructions for adding a badge. - -## Visuals -Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method. - -## Installation -Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection. - -## Usage -Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README. - -## Support -Tell people where they can go to for help. It can be any combination of an issue tracker, a chat room, an email address, etc. - -## Roadmap -If you have ideas for releases in the future, it is a good idea to list them in the README. - -## Contributing -State if you are open to contributions and what your requirements are for accepting them. - -For people who want to make changes to your project, it's helpful to have some documentation on how to get started. Perhaps there is a script that they should run or some environment variables that they need to set. Make these steps explicit. These instructions could also be useful to your future self. - -You can also document commands to lint the code or run tests. These steps help to ensure high code quality and reduce the likelihood that the changes inadvertently break something. Having instructions for running tests is especially helpful if it requires external setup, such as starting a Selenium server for testing in a browser. - -## Authors and acknowledgment -Show your appreciation to those who have contributed to the project. - -## License -For open source projects, say how it is licensed. - -## Project status -If you have run out of energy or time for your project, put a note at the top of the README saying that development has slowed down or stopped completely. Someone may choose to fork your project or volunteer to step in as a maintainer or owner, allowing your project to keep going. You can also make an explicit request for maintainers. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9e350b8 --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module github.com/crunchydata/postgresql-prometheus-adapter + +go 1.12 + +require ( + github.com/go-kit/kit v0.12.0 + github.com/gogo/protobuf v1.3.2 + github.com/golang/snappy v0.0.4 + github.com/jackc/pgx/v4 v4.16.1 + github.com/prometheus/client_golang v1.12.1 + github.com/prometheus/common v0.34.0 + github.com/prometheus/prometheus v0.35.0 + golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 // indirect + golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect + golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect + gopkg.in/alecthomas/kingpin.v2 v2.2.6 +) diff --git a/main.go b/main.go new file mode 100644 index 0000000..fcbb54e --- /dev/null +++ b/main.go @@ -0,0 +1,372 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +// Copyright 2019 Crunchy Data +// +// Based on the Prometheus remote storage example: +// documentation/examples/remote_storage/remote_storage_adapter/main.go +// + +// The main package for the Prometheus server executable. +package main + +import ( + "fmt" + "io/ioutil" + "net/http" + _ "net/http/pprof" + "os" + "os/signal" + "time" + + "path/filepath" + + "github.com/crunchydata/postgresql-prometheus-adapter/pkg/postgresql" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + + //"github.com/jamiealquiza/envy" + + "github.com/prometheus/common/promlog" + "github.com/prometheus/common/promlog/flag" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/prometheus/prompb" + + //"github.com/prometheus/client_model/go" + "github.com/prometheus/common/model" + "gopkg.in/alecthomas/kingpin.v2" + //"flag" +) + +var Version = "development" + +type config struct { + remoteTimeout time.Duration + listenAddr string + telemetryPath string + pgPrometheusConfig postgresql.Config + logLevel string + haGroupLockId int + prometheusTimeout time.Duration + promlogConfig promlog.Config +} + +const ( + tickInterval = time.Second + promLivenessCheck = time.Second + maxBgWriter = 10 + maxBgParser = 20 +) + +var ( + receivedSamples = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "received_samples_total", + Help: "Total number of received samples.", + }, + ) + sentSamples = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "sent_samples_total", + Help: "Total number of processed samples sent to remote storage.", + }, + []string{"remote"}, + ) + failedSamples = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "failed_samples_total", + Help: "Total number of processed samples which failed on send to remote storage.", + }, + []string{"remote"}, + ) + sentBatchDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "sent_batch_duration_seconds", + Help: "Duration of sample batch send calls to the remote storage.", + Buckets: prometheus.DefBuckets, + }, + []string{"remote"}, + ) + httpRequestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "http_request_duration_ms", + Help: "Duration of HTTP request in milliseconds", + Buckets: prometheus.DefBuckets, + }, + []string{"path"}, + ) +) + +var worker [maxBgWriter]postgresql.PGWriter + +func init() { + prometheus.MustRegister(receivedSamples) + prometheus.MustRegister(sentSamples) + prometheus.MustRegister(failedSamples) + prometheus.MustRegister(sentBatchDuration) + prometheus.MustRegister(httpRequestDuration) +} + +func main() { + cfg := parseFlags() + logger := promlog.New(&cfg.promlogConfig) + level.Info(logger).Log("config", fmt.Sprintf("%+v", cfg)) + level.Info(logger).Log("pgPrometheusConfig", fmt.Sprintf("%+v", cfg.pgPrometheusConfig)) + + if cfg.pgPrometheusConfig.PGWriters < 0 { + cfg.pgPrometheusConfig.PGWriters = 1 + } + if cfg.pgPrometheusConfig.PGWriters > maxBgWriter { + cfg.pgPrometheusConfig.PGWriters = maxBgWriter + } + + if cfg.pgPrometheusConfig.PGParsers < 0 { + cfg.pgPrometheusConfig.PGParsers = 1 + } + if cfg.pgPrometheusConfig.PGParsers > maxBgParser { + cfg.pgPrometheusConfig.PGParsers = maxBgParser + } + + http.Handle(cfg.telemetryPath, promhttp.Handler()) + writer, reader := buildClients(logger, cfg) + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func() { + for sig := range c { + fmt.Printf("Signal: %v\n", sig) + for t := 0; t < cfg.pgPrometheusConfig.PGWriters; t++ { + fmt.Printf("Calling shutdown %d\n", t) + worker[t].PGWriterShutdown() + } + for t := 0; t < cfg.pgPrometheusConfig.PGWriters; t++ { + for worker[t].Running { + time.Sleep(1 * time.Second) + fmt.Printf("Waiting for shutdown %d...\n", t) + } + } + os.Exit(0) + } + }() + for t := 0; t < cfg.pgPrometheusConfig.PGWriters; t++ { + go worker[t].RunPGWriter(logger, t, cfg.pgPrometheusConfig.CommitSecs, cfg.pgPrometheusConfig.CommitRows, cfg.pgPrometheusConfig.PGParsers, cfg.pgPrometheusConfig.PartitionScheme) + defer worker[t].PGWriterShutdown() + } + + level.Info(logger).Log("msg", "Starting HTTP Listerner") + + http.Handle("/write", timeHandler("write", write(logger, writer))) + http.Handle("/read", timeHandler("read", read(logger, reader))) + + level.Info(logger).Log("msg", "Starting up...") + level.Info(logger).Log("msg", "Listening", "addr", cfg.listenAddr) + + err := http.ListenAndServe(cfg.listenAddr, nil) + + level.Info(logger).Log("msg", "Started HTTP Listerner") + + if err != nil { + level.Error(logger).Log("msg", "Listen failure", "err", err) + os.Exit(1) + } +} + +func parseFlags() *config { + a := kingpin.New(filepath.Base(os.Args[0]), fmt.Sprintf("Remote storage adapter [ PostgreSQL ], Version: %s", Version)) + a.HelpFlag.Short('h') + + cfg := &config{ + promlogConfig: promlog.Config{}, + } + + a.Flag("adapter-send-timeout", "The timeout to use when sending samples to the remote storage.").Default("30s").DurationVar(&cfg.remoteTimeout) + a.Flag("web-listen-address", "Address to listen on for web endpoints.").Default(":9201").StringVar(&cfg.listenAddr) + a.Flag("web-telemetry-path", "Address to listen on for web endpoints.").Default("/metrics").StringVar(&cfg.telemetryPath) + flag.AddFlags(a, &cfg.promlogConfig) + + a.Flag("pg-partition", "daily or hourly partitions, default: hourly").Default("hourly").StringVar(&cfg.pgPrometheusConfig.PartitionScheme) + a.Flag("pg-commit-secs", "Write data to database every N seconds").Default("15").IntVar(&cfg.pgPrometheusConfig.CommitSecs) + a.Flag("pg-commit-rows", "Write data to database every N Rows").Default("20000").IntVar(&cfg.pgPrometheusConfig.CommitRows) + a.Flag("pg-threads", "Writer DB threads to run 1-10").Default("1").IntVar(&cfg.pgPrometheusConfig.PGWriters) + a.Flag("parser-threads", "parser threads to run per DB writer 1-10").Default("5").IntVar(&cfg.pgPrometheusConfig.PGParsers) + + _, err := a.Parse(os.Args[1:]) + if err != nil { + fmt.Fprintln(os.Stderr, "Error parsing commandline arguments") + a.Usage(os.Args[1:]) + os.Exit(2) + } + + return cfg +} + +type writer interface { + Write(samples model.Samples) error + Name() string +} + +type reader interface { + Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) + Name() string + HealthCheck() error +} + +func buildClients(logger log.Logger, cfg *config) (writer, reader) { + pgClient := postgresql.NewClient(log.With(logger, "storage", "PostgreSQL"), &cfg.pgPrometheusConfig) + + return pgClient, pgClient +} + +func write(logger log.Logger, writer writer) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + level.Error(logger).Log("msg", "Read error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + level.Error(logger).Log("msg", "Decode error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req prompb.WriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + level.Error(logger).Log("msg", "Unmarshal error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + samples := protoToSamples(&req) + receivedSamples.Add(float64(len(samples))) + + err = sendSamples(writer, samples) + if err != nil { + level.Warn(logger).Log("msg", "Error sending samples to remote storage", "err", err, "storage", writer.Name(), "num_samples", len(samples)) + } + + }) +} + +func read(logger log.Logger, reader reader) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + level.Error(logger).Log("msg", "Read error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + level.Error(logger).Log("msg", "Decode error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req prompb.ReadRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + level.Error(logger).Log("msg", "Unmarshal error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var resp *prompb.ReadResponse + resp, err = reader.Read(&req) + if err != nil { + fmt.Printf("MAIN req.Queries: %v\n", req.Queries) + level.Warn(logger).Log("msg", "Error executing query", "query", req, "storage", reader.Name(), "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + data, err := proto.Marshal(resp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/x-protobuf") + w.Header().Set("Content-Encoding", "snappy") + + compressed = snappy.Encode(nil, data) + if _, err := w.Write(compressed); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + }) +} + +func health(reader reader) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + err := reader.HealthCheck() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Length", "0") + }) +} + +func protoToSamples(req *prompb.WriteRequest) model.Samples { + var samples model.Samples + for _, ts := range req.Timeseries { + metric := make(model.Metric, len(ts.Labels)) + for _, l := range ts.Labels { + metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + + for _, s := range ts.Samples { + samples = append(samples, &model.Sample{ + Metric: metric, + Value: model.SampleValue(s.Value), + Timestamp: model.Time(s.Timestamp), + }) + } + } + return samples +} + +func sendSamples(w writer, samples model.Samples) error { + begin := time.Now() + var err error + err = w.Write(samples) + duration := time.Since(begin).Seconds() + if err != nil { + failedSamples.WithLabelValues(w.Name()).Add(float64(len(samples))) + return err + } + sentSamples.WithLabelValues(w.Name()).Add(float64(len(samples))) + sentBatchDuration.WithLabelValues(w.Name()).Observe(duration) + return nil +} + +// timeHandler uses Prometheus histogram to track request time +func timeHandler(path string, handler http.Handler) http.Handler { + f := func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + handler.ServeHTTP(w, r) + elapsedMs := time.Since(start).Nanoseconds() / int64(time.Millisecond) + httpRequestDuration.WithLabelValues(path).Observe(float64(elapsedMs)) + } + return http.HandlerFunc(f) +} diff --git a/makefile b/makefile new file mode 100644 index 0000000..f8f20f5 --- /dev/null +++ b/makefile @@ -0,0 +1,28 @@ +VERSION=1.1 +ORGANIZATION=crunchydata + +SOURCES:=$(shell find . -name '*.go' | grep -v './vendor') + +TARGET:=postgresql-prometheus-adapter + +.PHONY: all clean build docker-image docker-push test prepare-for-docker-build + +all: $(TARGET) + +build: $(TARGET) + +$(TARGET): main.go $(SOURCES) + go build -ldflags="-X 'main.Version=${VERSION}'" -o $(TARGET) + +container: $(TARGET) Dockerfile + @#podman rmi $(ORGANIZATION)/$(TARGET):latest $(ORGANIZATION)/$(TARGET):$(VERSION) + podman build -t $(ORGANIZATION)/$(TARGET):latest . + podman tag $(ORGANIZATION)/$(TARGET):latest $(ORGANIZATION)/$(TARGET):$(VERSION) + +container-save: container + rm -f $(TARGET)-$(VERSION).tar + podman save --output=$(TARGET)-$(VERSION).tar $(ORGANIZATION)/$(TARGET):$(VERSION) + +clean: + rm -f *~ $(TARGET) + diff --git a/pkg/postgresql/client.go b/pkg/postgresql/client.go new file mode 100644 index 0000000..1cdfe51 --- /dev/null +++ b/pkg/postgresql/client.go @@ -0,0 +1,629 @@ +package postgresql + +import ( + "container/list" + "context" + "encoding/json" + "fmt" + "os" + "reflect" + "runtime" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/jackc/pgx/v4" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" +) + +type tMetricIDMap map[string]int64 + +// Config for the database +type Config struct { + CommitSecs int + CommitRows int + PGWriters int + PGParsers int + PartitionScheme string +} + +var promSamples = list.New() + +// QueueMutex is used thread safe operations on promSamples list object. +var QueueMutex sync.Mutex +var vMetricIDMapMutex sync.Mutex +var vMetricIDMap tMetricIDMap + +// PGWriter - Threaded writer +type PGWriter struct { + DB *pgx.Conn + id int + KeepRunning bool + Running bool + + valueRows [][]interface{} + labelRows [][]interface{} + + PGWriterMutex sync.Mutex + logger log.Logger +} + +// PGParser - Threaded parser +type PGParser struct { + id int + KeepRunning bool + Running bool + + lastPartitionTS time.Time + valueRows [][]interface{} +} + +// RunPGParser starts the client and listens for a shutdown call. +func (p *PGParser) RunPGParser(tid int, partitionScheme string, c *PGWriter) { + var samples *model.Samples + p.id = tid + level.Info(c.logger).Log(fmt.Sprintf("bgparser%d", p.id), "Started") + p.Running = true + p.KeepRunning = true + + // Loop that runs forever + for p.KeepRunning { + samples = Pop() + if samples != nil { + for _, sample := range *samples { + sMetric := metricString(sample.Metric) + ts := time.Unix(sample.Timestamp.Unix(), 0) + milliseconds := sample.Timestamp.UnixNano() / 1000000 + if ts.Year() != p.lastPartitionTS.Year() || + ts.Month() != p.lastPartitionTS.Month() || + ts.Day() != p.lastPartitionTS.Day() { + p.lastPartitionTS = ts + _ = c.setupPgPartitions(partitionScheme, p.lastPartitionTS) + } + vMetricIDMapMutex.Lock() + id, ok := vMetricIDMap[sMetric] + + if !ok { + var nextId int64 = int64(len(vMetricIDMap) + 1) + vMetricIDMap[sMetric] = nextId + i := strings.Index(sMetric, "{") + jsonbMap := make(map[string]interface{}) + json.Unmarshal([]byte(sMetric[i:]), &jsonbMap) + c.labelRows = append(c.labelRows, []interface{}{int64(nextId), sMetric[:i], sMetric, jsonbMap}) + id = nextId + } + vMetricIDMapMutex.Unlock() + p.valueRows = append(p.valueRows, []interface{}{int64(id), toTimestamp(milliseconds), float64(sample.Value)}) + } + vMetricIDMapMutex.Lock() + c.valueRows = append(c.valueRows, p.valueRows...) + p.valueRows = nil + vMetricIDMapMutex.Unlock() + runtime.GC() + } + time.Sleep(10 * time.Millisecond) + } + level.Info(c.logger).Log(fmt.Sprintf("bgparser%d", p.id), "Shutdown") + p.Running = false +} + +// PGParserShutdown is a graceful shutdown +func (p *PGParser) PGParserShutdown() { + p.KeepRunning = false +} + +// RunPGWriter starts the client and listens for a shutdown call. +func (c *PGWriter) RunPGWriter(l log.Logger, tid int, commitSecs int, commitRows int, Parsers int, partitionScheme string) { + c.logger = l + c.id = tid + period := commitSecs * 1000 + var err error + var parser [20]PGParser + c.DB, err = pgx.Connect(context.Background(), os.Getenv("DATABASE_URL")) + if err != nil { + level.Error(c.logger).Log("err", err) + os.Exit(1) + } + if c.id == 0 { + c.setupPgPrometheus() + _ = c.setupPgPartitions(partitionScheme, time.Now()) + } + level.Info(c.logger).Log(fmt.Sprintf("bgwriter%d", c.id), fmt.Sprintf("Starting %d Parsers", Parsers)) + for p := 0; p < Parsers; p++ { + go parser[p].RunPGParser(p, partitionScheme, c) + defer parser[p].PGParserShutdown() + } + level.Info(c.logger).Log(fmt.Sprintf("bgwriter%d", c.id), "Started") + c.Running = true + c.KeepRunning = true + // Loop that runs forever + for c.KeepRunning { + if (period <= 0 && len(c.valueRows) > 0) || (len(c.valueRows) > commitRows) { + c.PGWriterSave() + period = commitSecs * 1000 + } else { + time.Sleep(10 * time.Millisecond) + period -= 10 + } + } + c.PGWriterSave() + level.Info(c.logger).Log(fmt.Sprintf("bgwriter%d", c.id), "Shutdown") + c.Running = false +} + +// PGWriterShutdown - Set shutdown flag for graceful shutdown +func (c *PGWriter) PGWriterShutdown() { + c.KeepRunning = false +} + +// PGWriterSave save data to DB +func (c *PGWriter) PGWriterSave() { + var copyCount, lblCount, rowCount int64 + var err error + begin := time.Now() + lblCount = int64(len(c.labelRows)) + c.PGWriterMutex.Lock() + if lblCount > 0 { + copyCount, err := c.DB.CopyFrom(context.Background(), pgx.Identifier{"metric_labels"}, []string{"metric_id", "metric_name", "metric_name_label", "metric_labels"}, pgx.CopyFromRows(c.labelRows)) + c.labelRows = nil + if err != nil { + level.Error(c.logger).Log("msg", "COPY failed for metric_labels", "err", err) + } + if copyCount != lblCount { + level.Error(c.logger).Log("msg", "All rows not copied metric_labels", "err", err) + } + } + copyCount, err = c.DB.CopyFrom(context.Background(), pgx.Identifier{"metric_values"}, []string{"metric_id", "metric_time", "metric_value"}, pgx.CopyFromRows(c.valueRows)) + rowCount = int64(len(c.valueRows)) + c.valueRows = nil + c.PGWriterMutex.Unlock() + if err != nil { + level.Error(c.logger).Log("msg", "COPY failed for metric_values", "err", err) + } + if copyCount != rowCount { + level.Error(c.logger).Log("msg", "All rows not copied metric_values", "err", err) + } + duration := time.Since(begin).Seconds() + level.Info(c.logger).Log("metric", fmt.Sprintf("BGWriter%d: Processed samples count,%d, duration,%v", c.id, rowCount+lblCount, duration)) +} + +// Push - Push element at then end of list +func Push(samples *model.Samples) { + QueueMutex.Lock() + promSamples.PushBack(samples) + QueueMutex.Unlock() +} + +// Pop - Pop first element from list +func Pop() *model.Samples { + QueueMutex.Lock() + defer QueueMutex.Unlock() + p := promSamples.Front() + if p != nil { + return promSamples.Remove(p).(*model.Samples) + } + return nil +} + +// Client - struct to hold critical values +type Client struct { + logger log.Logger + DB *pgx.Conn + cfg *Config +} + +// NewClient creates a new PostgreSQL client +func NewClient(logger log.Logger, cfg *Config) *Client { + if logger == nil { + logger = log.NewNopLogger() + } + + conn1, err := pgx.Connect(context.Background(), os.Getenv("DATABASE_URL")) + if err != nil { + fmt.Fprintln(os.Stderr, "Error: Unable to connect to database using DATABASE_URL=", os.Getenv("DATABASE_URL")) + os.Exit(1) + } + + client := &Client{ + logger: logger, + DB: conn1, + cfg: cfg, + } + + return client +} + +func (c *PGWriter) setupPgPrometheus() error { + level.Info(c.logger).Log("msg", "creating tables") + + _, err := c.DB.Exec(context.Background(), "CREATE TABLE IF NOT EXISTS metric_labels ( metric_id BIGINT PRIMARY KEY, metric_name TEXT NOT NULL, metric_name_label TEXT NOT NULL, metric_labels jsonb, UNIQUE(metric_name, metric_labels) )") + if err != nil { + return err + } + + _, err = c.DB.Exec(context.Background(), "CREATE INDEX IF NOT EXISTS metric_labels_labels_idx ON metric_labels USING GIN (metric_labels)") + if err != nil { + return err + } + + _, err = c.DB.Exec(context.Background(), "CREATE TABLE IF NOT EXISTS metric_values (metric_id BIGINT, metric_time TIMESTAMPTZ, metric_value FLOAT8 ) PARTITION BY RANGE (metric_time)") + if err != nil { + return err + } + + _, err = c.DB.Exec(context.Background(), "CREATE INDEX IF NOT EXISTS metric_values_id_time_idx on metric_values USING btree (metric_id, metric_time DESC)") + if err != nil { + return err + } + + _, err = c.DB.Exec(context.Background(), "CREATE INDEX IF NOT EXISTS metric_values_time_idx on metric_values USING btree (metric_time DESC)") + if err != nil { + return err + } + + vMetricIDMapMutex.Lock() + defer vMetricIDMapMutex.Unlock() + vMetricIDMap = make(tMetricIDMap) + rows, err1 := c.DB.Query(context.Background(), "SELECT metric_name_label, metric_id from metric_labels") + + if err1 != nil { + rows.Close() + level.Info(c.logger).Log("msg", "Error reading metric_labels") + return err + } + + for rows.Next() { + var ( + metricNameLabel string + metricID int64 + ) + err := rows.Scan(&metricNameLabel, &metricID) + + if err != nil { + rows.Close() + level.Info(c.logger).Log("msg", "Error scaning metric_labels") + return err + } + vMetricIDMap[metricNameLabel] = metricID + } + level.Info(c.logger).Log("msg", fmt.Sprintf("%d Rows Loaded in map: ", len(vMetricIDMap))) + rows.Close() + + return nil +} + +func (c *PGWriter) setupPgPartitions(partitionScheme string, lastPartitionTS time.Time) error { + sDate := lastPartitionTS + eDate := sDate + if partitionScheme == "daily" { + level.Info(c.logger).Log("msg", "Creating partition, daily") + _, err := c.DB.Exec(context.Background(), fmt.Sprintf("CREATE TABLE IF NOT EXISTS metric_values_%s PARTITION OF metric_values FOR VALUES FROM ('%s 00:00:00') TO ('%s 00:00:00')", sDate.Format("20060102"), sDate.Format("2006-01-02"), eDate.AddDate(0, 0, 1).Format("2006-01-02"))) + if err != nil { + return err + } + } else if partitionScheme == "hourly" { + sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS metric_values_%s PARTITION OF metric_values FOR VALUES FROM ('%s 00:00:00') TO ('%s 00:00:00') PARTITION BY RANGE (metric_time);", sDate.Format("20060102"), sDate.Format("2006-01-02"), eDate.AddDate(0, 0, 1).Format("2006-01-02")) + var h int + for h = 0; h < 23; h++ { + sql = fmt.Sprintf("%s CREATE TABLE IF NOT EXISTS metric_values_%s_%02d PARTITION OF metric_values_%s FOR VALUES FROM ('%s %02d:00:00') TO ('%s %02d:00:00');", sql, sDate.Format("20060102"), h, sDate.Format("20060102"), sDate.Format("2006-01-02"), h, eDate.Format("2006-01-02"), h+1) + } + level.Info(c.logger).Log("msg", "Creating partition, hourly") + _, err := c.DB.Exec(context.Background(), fmt.Sprintf("%s CREATE TABLE IF NOT EXISTS metric_values_%s_%02d PARTITION OF metric_values_%s FOR VALUES FROM ('%s %02d:00:00') TO ('%s 00:00:00');", sql, sDate.Format("20060102"), h, sDate.Format("20060102"), sDate.Format("2006-01-02"), h, eDate.AddDate(0, 0, 1).Format("2006-01-02"))) + if err != nil { + return err + } + } + return nil +} + +func metricString(m model.Metric) string { + metricName, hasName := m[model.MetricNameLabel] + numLabels := len(m) - 1 + if !hasName { + numLabels = len(m) + } + labelStrings := make([]string, 0, numLabels) + for label, value := range m { + if label != model.MetricNameLabel { + labelStrings = append(labelStrings, fmt.Sprintf("\"%s\": %q", label, value)) + } + } + + switch numLabels { + case 0: + if hasName { + return string(metricName) + } + return "{}" + default: + sort.Strings(labelStrings) + return fmt.Sprintf("%s{%s}", metricName, strings.Join(labelStrings, ", ")) + } +} + +// Write implements the Writer interface and writes metric samples to the database +func (c *Client) Write(samples model.Samples) error { + Push(&samples) + return nil +} + +type sampleLabels struct { + JSON []byte + Map map[string]string + OrderedKeys []string +} + +func createOrderedKeys(m *map[string]string) []string { + keys := make([]string, 0, len(*m)) + for k := range *m { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + +// Close - Close database connections +func (c *Client) Close() { + if c.DB != nil { + if err1 := c.DB.Close(context.Background()); err1 != nil { + level.Error(c.logger).Log("msg", err1.Error()) + } + } +} + +func (l *sampleLabels) Scan(value interface{}) error { + if value == nil { + l = &sampleLabels{} + return nil + } + + switch t := value.(type) { + case []uint8: + m := make(map[string]string) + err := json.Unmarshal(t, &m) + + if err != nil { + return err + } + + *l = sampleLabels{ + JSON: t, + Map: m, + OrderedKeys: createOrderedKeys(&m), + } + return nil + } + return fmt.Errorf("invalid labels value %s", reflect.TypeOf(value)) +} + +func (l sampleLabels) String() string { + return string(l.JSON) +} + +func (l sampleLabels) key(extra string) string { + // 0xff cannot cannot occur in valid UTF-8 sequences, so use it + // as a separator here. + separator := "\xff" + pairs := make([]string, 0, len(l.Map)+1) + pairs = append(pairs, extra+separator) + + for _, k := range l.OrderedKeys { + pairs = append(pairs, k+separator+l.Map[k]) + } + return strings.Join(pairs, separator) +} + +func (l *sampleLabels) len() int { + return len(l.OrderedKeys) +} + +// Read implements the Reader interface and reads metrics samples from the database +func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) { + + fmt.Printf("READ req.Queries: %v\n", req.Queries) + labelsToSeries := map[string]*prompb.TimeSeries{} + + for _, q := range req.Queries { + command, err := c.buildCommand(q) + + if err != nil { + return nil, err + } + + level.Debug(c.logger).Log("msg", "Executed query", "query", command) + + rows, err := c.DB.Query(context.Background(), command) + + if err != nil { + rows.Close() + return nil, err + } + + for rows.Next() { + var ( + value float64 + name string + labels sampleLabels + time time.Time + ) + err := rows.Scan(&time, &name, &value, &labels) + + if err != nil { + rows.Close() + return nil, err + } + + key := labels.key(name) + ts, ok := labelsToSeries[key] + + if !ok { + labelPairs := make([]prompb.Label, 0, labels.len()+1) + labelPairs = append(labelPairs, prompb.Label{ + Name: model.MetricNameLabel, + Value: name, + }) + + for _, k := range labels.OrderedKeys { + labelPairs = append(labelPairs, prompb.Label{ + Name: k, + Value: labels.Map[k], + }) + } + + ts = &prompb.TimeSeries{ + Labels: labelPairs, + Samples: make([]prompb.Sample, 0, 100), + } + labelsToSeries[key] = ts + } + + ts.Samples = append(ts.Samples, prompb.Sample{ + Timestamp: time.UnixNano() / 1000000, + Value: value, + }) + } + + err = rows.Err() + rows.Close() + + if err != nil { + return nil, err + } + } + + resp := prompb.ReadResponse{ + Results: []*prompb.QueryResult{ + { + Timeseries: make([]*prompb.TimeSeries, 0, len(labelsToSeries)), + }, + }, + } + for _, ts := range labelsToSeries { + resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, ts) + } + + level.Debug(c.logger).Log("msg", "Returned response", "#timeseries", len(labelsToSeries)) + + return &resp, nil +} + +// HealthCheck implements the healtcheck interface +func (c *Client) HealthCheck() error { + rows, err := c.DB.Query(context.Background(), "SELECT 1") + defer rows.Close() + if err != nil { + level.Debug(c.logger).Log("msg", "Health check error", "err", err) + return err + } + + return nil +} + +func toTimestamp(milliseconds int64) time.Time { + sec := milliseconds / 1000 + nsec := (milliseconds - (sec * 1000)) * 1000000 + return time.Unix(sec, nsec).UTC() +} + +func (c *Client) buildQuery(q *prompb.Query) (string, error) { + matchers := make([]string, 0, len(q.Matchers)) + labelEqualPredicates := make(map[string]string) + + for _, m := range q.Matchers { + escapedName := escapeValue(m.Name) + escapedValue := escapeValue(m.Value) + + if m.Name == model.MetricNameLabel { + switch m.Type { + case prompb.LabelMatcher_EQ: + if len(escapedValue) == 0 { + matchers = append(matchers, fmt.Sprintf("(l.metric_name IS NULL OR name = '')")) + } else { + matchers = append(matchers, fmt.Sprintf("l.metric_name = '%s'", escapedValue)) + } + case prompb.LabelMatcher_NEQ: + matchers = append(matchers, fmt.Sprintf("l.metric_name != '%s'", escapedValue)) + case prompb.LabelMatcher_RE: + matchers = append(matchers, fmt.Sprintf("l.metric_name ~ '%s'", anchorValue(escapedValue))) + case prompb.LabelMatcher_NRE: + matchers = append(matchers, fmt.Sprintf("l.metric_name !~ '%s'", anchorValue(escapedValue))) + default: + return "", fmt.Errorf("unknown metric name match type %v", m.Type) + } + } else { + switch m.Type { + case prompb.LabelMatcher_EQ: + if len(escapedValue) == 0 { + // From the PromQL docs: "Label matchers that match + // empty label values also select all time series that + // do not have the specific label set at all." + matchers = append(matchers, fmt.Sprintf("((l.metric_labels ? '%s') = false OR (l.metric_labels->>'%s' = ''))", + escapedName, escapedName)) + } else { + labelEqualPredicates[escapedName] = escapedValue + } + case prompb.LabelMatcher_NEQ: + matchers = append(matchers, fmt.Sprintf("l.metric_labels->>'%s' != '%s'", escapedName, escapedValue)) + case prompb.LabelMatcher_RE: + matchers = append(matchers, fmt.Sprintf("l.metric_labels->>'%s' ~ '%s'", escapedName, anchorValue(escapedValue))) + case prompb.LabelMatcher_NRE: + matchers = append(matchers, fmt.Sprintf("l.metric_labels->>'%s' !~ '%s'", escapedName, anchorValue(escapedValue))) + default: + return "", fmt.Errorf("unknown match type %v", m.Type) + } + } + } + equalsPredicate := "" + + if len(labelEqualPredicates) > 0 { + labelsJSON, err := json.Marshal(labelEqualPredicates) + + if err != nil { + return "", err + } + equalsPredicate = fmt.Sprintf(" AND l.metric_labels @> '%s'", labelsJSON) + } + + matchers = append(matchers, fmt.Sprintf("v.metric_time >= '%v'", toTimestamp(q.StartTimestampMs).Format(time.RFC3339))) + matchers = append(matchers, fmt.Sprintf("v.metric_time <= '%v'", toTimestamp(q.EndTimestampMs).Format(time.RFC3339))) + + return fmt.Sprintf("SELECT v.metric_time, l.metric_name, v.metric_value, l.metric_labels FROM metric_values v, metric_labels l WHERE l.metric_id = v.metric_id and %s %s ORDER BY v.metric_time", + strings.Join(matchers, " AND "), equalsPredicate), nil +} + +func (c *Client) buildCommand(q *prompb.Query) (string, error) { + return c.buildQuery(q) +} + +func escapeValue(str string) string { + return strings.Replace(str, `'`, `''`, -1) +} + +// anchorValue adds anchors to values in regexps since PromQL docs +// states that "Regex-matches are fully anchored." +func anchorValue(str string) string { + l := len(str) + + if l == 0 || (str[0] == '^' && str[l-1] == '$') { + return str + } + + if str[0] == '^' { + return fmt.Sprintf("%s$", str) + } + + if str[l-1] == '$' { + return fmt.Sprintf("^%s", str) + } + + return fmt.Sprintf("^%s$", str) +} + +// Name identifies the client as a PostgreSQL client. +func (c Client) Name() string { + return "PostgreSQL" +} diff --git a/start.sh b/start.sh new file mode 100755 index 0000000..721f016 --- /dev/null +++ b/start.sh @@ -0,0 +1,49 @@ +#!/bin/bash + +if [[ "${DATABASE_URL}" == "" ]]; then + echo 'Missing DATABASE_URL' + echo 'example -e DATABASE_URL="user= password= host= port= database="' + exit 1 +fi + +trap shutdown INT + +function shutdown() { + pkill -SIGINT postgresql-prometheus-adapter +} + +adapter_send_timeout=${adapter_send_timeout:-'30s'} +web_listen_address="${web_listen_address:-':9201'}" +web_telemetry_path="${web_telemetry_path:-'/metrics'}" +log_level="${log_level:-'info'}" +log_format="${log_format:-'logfmt'}" +pg_partition="${pg_partition:-'hourly'}" +pg_commit_secs=${pg_commit_secs:-30} +pg_commit_rows=${pg_commit_rows:-20000} +pg_threads="${pg_threads:-1}" +parser_threads="${parser_threads:-5}" + +echo /postgresql-prometheus-adapter \ + --adapter-send-timeout=${adapter_send_timeout} \ + --web-listen-address=${web_listen_address} \ + --web-telemetry-path=${web_telemetry_path} \ + --log.level=${log_level} \ + --log.format=${log_format} \ + --pg-partition=${pg_partition} \ + --pg-commit-secs=${pg_commit_secs} \ + --pg-commit-rows=${pg_commit_rows} \ + --pg-threads=${pg_threads} \ + --parser-threads=${parser_threads} + +/postgresql-prometheus-adapter \ + --adapter-send-timeout=${adapter_send_timeout} \ + --web-listen-address=${web_listen_address} \ + --web-telemetry-path=${web_telemetry_path} \ + --log.level=${log_level} \ + --log.format=${log_format} \ + --pg-partition=${pg_partition} \ + --pg-commit-secs=${pg_commit_secs} \ + --pg-commit-rows=${pg_commit_rows} \ + --pg-threads=${pg_threads} \ + --parser-threads=${parser_threads} +