April 03, 2024 | 5 minutes read
Our Client
Our client builds reliability and maintenance solutions for manufacturing plants. The data collected from these plants are high in volume and frequency.
Business Need
- The Database kept running out of storage space and we reached AWS limits for maximum disk space allowed. We needed to implement a solution that lets us scale horizontally to minimize downtime and keep performance optimal.
- Upgrading the database versions. The existing database was on postgres version 11 and timescale version 1.7. Both are not supported anymore and we had to move to a version which is still maintained.
Challenges
- Critical Data was inserted every millisecond. We could not afford to lose this data. We had to strive for minimal or 0 downtime.
- The volume of data was around 40TB and continuously growing.
- We could not use a managed service.
- The scope for data deletion was tricky. While the entire data was not needed for queries, it was still required to be kept for compliance reasons.
- Sharding came up as an integral component of our solution. We decided to use the organization as part of the sharding key. However, this column was not directly present in any table, and updating the existing table had certain restrictions in the legacy TimescaleDB.
- Logical replication was a challenge as our source and destination had to be on different versions of the database and we also had to shard data across multiple nodes.
Prerequisite understanding
We will have to explain a few terms for the following sections to make sense. Feel free to skip the parts you already know.
Time-series data
Time-series data represents how a system, process, or behavior changes over time. For example, if you are taking measurements from a temperature gauge every five minutes, you are collecting time-series data. Another common example is stock price changes or even the battery life of your smartphone. As these measurements change over time, each data point is recorded alongside its timestamp, allowing it to be measured, analyzed, and visualized.
While databases have always had time fields, specialized databases for handling time series data can make your database work more effectively. Examples of various time-series databases are TimescaleDB, InfluxDB, and Prometheus.
TimescaleDB
TimescaleDB is an extension for PostgreSQL that enables time-series workloads, increasing ingest, query, storage, and analytics performance. This characteristic of being built on top of PostgreSQL is one of its key differentiating factors from other time series databases. It allows you to use the familiarity of battle-tested PostgreSQL features like SQL with its query optimizer (as opposed to another domain-specific language like flux in InfluxDB), replication, and interoperability with other PostgreSQL extensions.
Hypertable
Hypertables are a TimescaleDB feature which are PostgreSQL tables that automatically partition your data by time. You interact with hypertables in the same way as regular PostgreSQL tables, but with extra features that make managing your time-series data much easier. When you create and use a hypertable, it automatically partitions data by time, and optionally by space.
Sharding
Database sharding is the process of storing a large database across multiple machines. A single machine, or database server, can store and process only a limited amount of data. Database sharding overcomes this limitation by splitting data into smaller chunks, called shards, and storing them across several database servers. All database servers usually have the same underlying technologies, and they work together to store and process large volumes of data.
IOPS vs Throughput
Both IOPS and Throughput are important characteristics in any storage system to measure performance.
IOPS: IOPS measures the number of input/output operations that can be performed by a storage device in one second. These operations typically include reads and writes.
Throughput: Throughput refers to the amount of data that can be transferred per unit of time (usually measured in MB/s.
Choosing the right parameters for the above is crucial for any database storage system.
WAL(Write-Ahead Logging)
Write-Ahead Logging is a standard method for ensuring data integrity. Briefly, WAL’s central concept is that changes to data files (where tables and indexes reside) must be written after those changes have been logged, that is, after WAL records describing the changes have been flushed to permanent storage.
Using WAL results in a significantly reduced number of disk writes, because only the WAL file needs to be flushed to disk to guarantee that a transaction is committed, rather than every data file changed by the transaction.
Solution
Analysis Phase
We spent a month analyzing the tables, and their respective query usage patterns across Java services, Kafka consumers, and Python data science jobs. We migrated a subset of this data into a new database and performed certain benchmarks against various use cases.
Following were a few experiments we tried:
- We set up some simple Python scripts which would export this data to CSV and import it into a database from CSV. We went with CSV to keep it db agnostic as we were considering options outside of TimescaleDB as well.
- We found that TimescaleDB had a multi-node setup which seemed like the ideal solution to our problem. We successfully migrated data to it and ran a few benchmarks. We noticed it was slower than the existing database. This was most likely due to the network latency of communication between multiple nodes. The data was evenly distributed across multiple nodes. For example, if there were 1000 records in the last second and you had 5 nodes, you would have 200 records on each node. This means even for a query within a small date range, the query would face network latency of multiple nodes. Also, on further analysis and scouring through community channels, we discovered that Multi-Node was in the process of being deprecated by TimescaleDB.
- We discovered a feature called tablespaces (https://www.postgresql.org/docs/current/manage-ag-tablespaces.html) in Postgres which TimescaleDB could also leverage. This would let us store separate tables in separate EBS volumes. We ran benchmarks here and found performance to be identical to the existing setup.
Following were a few observations we had:
- The distribution of data across tables was very uneven with 5% of tables accounting for 95% of storage space
- The distribution of data across tables was very uneven with 5% of tables accounting for 95% of storage space
- One important characteristic of time-series data is that data is always appended and rarely (preferably never) updated. We noticed the same and for most tables, we could assume that data would only be inserted and never updated.
Every query was filtering data in a certain time range. Furthermore, 90% of our queries had filters with a time range and an identifier column which could be mapped to an organization. This information would later become crucial for us in planning our sharding strategy.
- For most tables, we only needed one year’s worth of data to be queried but we could not discard the remaining data that we had for compliance reasons.
- Even within one year of data, most of the queries on older data were aggregate queries.
- The database was using io2 EBS volumes. However, we did not need the excessive IOPS and throughput provided by io2 volumes. We had to choose this only because no other volume type could store 40TB of data. There was a significant opportunity to optimize this for cost.
Following were some conclusions we reached:
- We decided to continue with TimescaleDB as most of the application logic heavily relied on the SQL interface which was lacking in alternatives like InfluxDB.
- We realized that we could leverage tablespaces to distribute our tables across multiple smaller volumes helping us stay within the size limits imposed by AWS EBS and spend only as much as required.
- We also decided to shard based on organization but instead of keeping every organization on individual shards, we decided to keep the top 5 organizations into separate shards and keep the rest of the smaller organizations on a common shard. Going with the general hash-based sharding approach would have led to uneven distribution of load. Also, because most of our tables did not directly have organization as a column, and the mapping of a row to an organization needed certain business-specific logic, we decided to go with a custom application-level sharding solution.
- We would set retention policies to keep only one year’s worth of data and store the remaining data as CSV files in AWS Glacier storage.
- We would set compression policies offered by TimescaleDB for older data which would make the aggregated queries faster and keep disk usage low. (https://www.timescale.com/blog/building-columnar-compression-in-a-row-oriented-database/)
Execution
We decided to execute the above conclusions in three phases:
- Migrate data to the newer TimescaleDB version with multiple volumes without sharding.
- Develop and Integrate backward-compatible sharding code.
- Set up multiple database instances housing respective shards and migrate shard-level data to it.
Phase 1
- Set up a new multi-volume database.
- Setup parallel Kafka consumers to write data in both the old and new database.
- Instead of exporting data for backfill from the old database, we set up a copy of the database using a snapshot and exported from there. This was to avoid affecting user experience due to any potential load caused by our exports.
- We rewrote the Python script used in the experimental phase as a CLI tool to backfill data. We also made use of celery to run multiple jobs in parallel and have a way of monitoring the status of these jobs. We also added features to compare data between both databases.
- We also used the cli tool to take daily CSV exports of this data and archive it in AWS Glacier. This meant we could start deleting stale data from the database but continue to have it in Glacier.
- We went live with the above and were able to get to a multi-volume TimescaleDB database running the latest version. With this in place and the confidence and experience we gained from the exercise the next two phases were significantly less risky.
Phase 2
- We developed a library in Java and Python for application-level sharding. This was able to apply the necessary business logic to identify a row as belonging to a specific organization. Based on this information it would decide where to run the query.
- We tested this extensively in lower environments and we also tested the sharding logic in a scenario where all shards resided on the same database. This would only test the sharding logic and not the actual physical presence of shards.
- We went live with the above and were quite confident in our sharding code.
Phase 3
- We set up multiple database instances for the specific shards that we needed. This time we chose cheaper EBS volumes like gp3 because we would have significantly lesser data on each instance and we never needed the extremely high IOPS that io2 provided.
- We again started parallel consumers but this time, our new consumers had a different sharding configuration than the older ones. The older ones continued to write in the existing database, while the newer ones started writing to different sharded databases based on the updated configuration that we set up.
- We had to modify our backfill script with sharding intelligence and start backfilling older data. We also modified our data comparison scripts with the same sharding intelligence.
- After the backfill of data was complete, all we had to do was point our application to the new database, shut down the older consumers, and monitor. This was achieved without downtime.
Results
- By switching from io2 to gp3 volumes, we were able to significantly reduce costs. Furthermore, we used a separate io2 volume for WAL, which meant the inserts would continue to be performant.
- By introducing sharding, performance went up significantly. Time taken for daily CSV exports went down from 12 hours with significant failures and retries to 3-4 hours with 0 failures.
- We were able to address concerns about running out of storage space by setting up multiple avenues of scalability via tablespaces and sharding.