import datetime
from collections import Iterable
from typing import Optional, Type, Union
import pytz
from dateutil.relativedelta import MO, relativedelta
from django.conf import settings
from django.db import IntegrityError, models
from django.db.models import Q
from django.utils import timezone
from pg_partitioning.shortcuts import double_quote, execute_sql, generate_set_indexes_tablespace_sql, single_quote
from .constants import (
DT_FORMAT,
SQL_APPEND_TABLESPACE,
SQL_ATTACH_LIST_PARTITION,
SQL_CREATE_LIST_PARTITION,
SQL_DETACH_PARTITION,
SQL_SET_TABLE_TABLESPACE,
PartitioningType,
PeriodType,
)
from .models import PartitionConfig, PartitionLog
class _PartitionManagerBase:
type = None
def __init__(self, model: Type[models.Model], partition_key: str, options: dict):
self.model = model
self.partition_key = partition_key
self.options = options
[docs]class TimeRangePartitionManager(_PartitionManagerBase):
"""Manage time-based partition APIs."""
type = PartitioningType.Range
@property
def config(self) -> PartitionConfig:
"""Get the latest PartitionConfig instance of this model.
In order to avoid the race condition, we used **select_for_update** when querying.
Returns:
PartitionConfig: The latest PartitionConfig instance of this model.
"""
try:
return PartitionConfig.objects.select_for_update().get(model_label=self.model._meta.label_lower)
except PartitionConfig.DoesNotExist:
try:
return PartitionConfig.objects.create(
model_label=self.model._meta.label_lower,
period=self.options.get("default_period", PeriodType.Month),
interval=self.options.get("default_interval"),
attach_tablespace=self.options.get("default_attach_tablespace"),
detach_tablespace=self.options.get("default_detach_tablespace"),
)
except IntegrityError:
return PartitionConfig.objects.select_for_update().get(model_label=self.model._meta.label_lower)
@property
def latest(self) -> Optional[PartitionLog]:
"""Get the latest PartitionLog instance of this model.
Returns:
Optional[PartitionLog]: The latest PartitionLog instance of this model or none.
"""
return self.config.logs.order_by("-id").first()
@classmethod
def _get_period_bound(cls, date_start, initial, addition_zeros=None, is_week=False, **kwargs):
zeros = {"hour": 0, "minute": 0, "second": 0, "microsecond": 0}
if addition_zeros:
zeros.update(addition_zeros)
def func(): # lazy evaluation
if initial:
start = date_start.replace(**zeros)
if is_week:
start -= relativedelta(days=start.weekday())
else:
start = date_start
end = start + relativedelta(**kwargs, **zeros)
return start, end
return func
[docs] def create_partition(self, max_days_to_next_partition: int = 1) -> None:
"""The partition of the next cycle is created according to the configuration.
After modifying the period field, the new period will take effect the next time.
The start time of the new partition is the end time of the previous partition table,
or the start time of the current archive period when no partition exists.
For example:
the current time is June 5, 2018, and the archiving period is one year,
then the start time of the first partition is 00:00:00 on January 1, 2018.
Parameters:
max_days_to_next_partition(int):
If numbers of days remained in current partition is greater than ``max_days_to_next_partition``, no new partitions will be created.
"""
while True:
if max_days_to_next_partition > 0 and self.latest and timezone.now() < (self.latest.end - relativedelta(days=max_days_to_next_partition)):
return
partition_timezone = getattr(settings, "PARTITION_TIMEZONE", None)
if partition_timezone:
partition_timezone = pytz.timezone(partition_timezone)
date_start = timezone.localtime(self.latest.end if self.latest else None, timezone=partition_timezone)
initial = not bool(self.latest)
date_start, date_end = {
PeriodType.Day: self._get_period_bound(date_start, initial, days=+1),
PeriodType.Week: self._get_period_bound(date_start, initial, is_week=True, days=+1, weekday=MO),
PeriodType.Month: self._get_period_bound(date_start, initial, addition_zeros=dict(day=1), months=+1),
PeriodType.Year: self._get_period_bound(date_start, initial, addition_zeros=dict(month=1, day=1), years=+1),
}[self.config.period]()
partition_table_name = "_".join((self.model._meta.db_table, date_start.strftime(DT_FORMAT), date_end.strftime(DT_FORMAT)))
PartitionLog.objects.create(config=self.config, table_name=partition_table_name, start=date_start, end=date_end)
if not max_days_to_next_partition > 0:
return
[docs] def attach_partition(self, partition_log: Optional[Iterable] = None, detach_time: Optional[datetime.datetime] = None) -> None:
"""Attach partitions.
Parameters:
partition_log(Optional[Iterable]):
All partitions are attached when you don't specify partitions to attach.
detach_time(Optional[datetime.datetime]):
When the partition specifies the archive time, it will **not** be automatically archived until that time.
"""
if not partition_log:
partition_log = PartitionLog.objects.filter(config=self.config, is_attached=False)
for log in partition_log:
log.is_attached = True
log.detach_time = detach_time
log.save()
[docs] def detach_partition(self, partition_log: Optional[Iterable] = None) -> None:
"""Detach partitions.
Parameters:
partition_log(Optional[Iterable]):
Specify a partition to archive. When you don't specify a partition to archive, all partitions that meet the configuration rule are archived.
"""
if not partition_log:
if self.config.interval:
# fmt: off
period = {PeriodType.Day: {"days": 1},
PeriodType.Week: {"weeks": 1},
PeriodType.Month: {"months": 1},
PeriodType.Year: {"years": 1}}[self.config.period]
# fmt: on
now = timezone.now()
detach_timeline = now - self.config.interval * relativedelta(**period)
partition_log = PartitionLog.objects.filter(config=self.config, end__lt=detach_timeline, is_attached=True)
partition_log = partition_log.filter(Q(detach_time=None) | Q(detach_time__lt=now))
else:
return
for log in partition_log:
log.is_attached = False
log.detach_time = None
log.save()
[docs] def delete_partition(self, partition_log: Iterable) -> None:
"""Delete partitions.
Parameters:
partition_log(Iterable): The partitions to be deleted.
"""
for log in partition_log:
if log.config == self.config:
log.delete()
def _db_value(value: Union[str, int, bool, None]) -> str:
if value is None:
return "null"
return single_quote(value) if isinstance(value, str) else str(value)
[docs]class ListPartitionManager(_PartitionManagerBase):
"""Manage list-based partition APIs."""
type = PartitioningType.List
[docs] def create_partition(self, partition_name: str, value: Union[str, int, bool, None], tablespace: str = None) -> None:
"""Create partitions.
Parameters:
partition_name(str): Partition name.
value(Union[str, int, bool, None]): Partition key value.
tablespace(str): Partition tablespace name.
"""
sql_sequence = [
SQL_CREATE_LIST_PARTITION % {"parent": double_quote(self.model._meta.db_table), "child": double_quote(partition_name), "value": _db_value(value)}
]
if tablespace:
sql_sequence[0] += SQL_APPEND_TABLESPACE % {"tablespace": tablespace}
sql_sequence.extend(generate_set_indexes_tablespace_sql(partition_name, tablespace))
execute_sql(sql_sequence)
[docs] def attach_partition(self, partition_name: str, value: Union[str, int, bool, None], tablespace: str = None) -> None:
"""Attach partitions.
Parameters:
partition_name(str): Partition name.
value(Union[str, int, bool, None]): Partition key value.
tablespace(str): Partition tablespace name.
"""
sql_sequence = list()
if tablespace:
sql_sequence.append(SQL_SET_TABLE_TABLESPACE % {"name": double_quote(partition_name), "tablespace": tablespace})
sql_sequence.extend(generate_set_indexes_tablespace_sql(partition_name, tablespace))
sql_sequence.append(
SQL_ATTACH_LIST_PARTITION
% {"parent": double_quote(self.model._meta.db_table), "child": double_quote(partition_name), "value": single_quote(_db_value(value))}
)
execute_sql(sql_sequence)
[docs] def detach_partition(self, partition_name: str, tablespace: str = None) -> None:
"""Detach partitions.
Parameters:
partition_name(str): Partition name.
tablespace(str): Partition tablespace name.
"""
sql_sequence = [SQL_DETACH_PARTITION % {"parent": double_quote(self.model._meta.db_table), "child": double_quote(partition_name)}]
if tablespace:
sql_sequence.append(SQL_SET_TABLE_TABLESPACE % {"name": double_quote(partition_name), "tablespace": tablespace})
sql_sequence.extend(generate_set_indexes_tablespace_sql(partition_name, tablespace))
execute_sql(sql_sequence)