maildir tidying
Table of Contents
1. Introduction
cleanmbox
applies stored email pattern-directed rules to mail in local maildir storage.
Requires directory tree follow maildir++
structure.
Features:
- match on regex for sender, subject
- patterns for email more than n-days old
- handles utf-8 encoded subject text
Rules supplied from separate text file, default location ~/.config/cleanmbox/rules.csv
:
1.1. Usage
$ cleanmbox
apply email handling rules from CLEANMBOX_RULES_FILE to mail in MAILDIR
1.2. Environment Variables
- MAILDIR
- local mail storage directory [=~/.mail=]
- XDG_CONFIG_HOME
- root directory for configuration files [=~/.config=]
- CLEANMBOX_CONFIG_DIR
- directory containing rules file [=XDG_CONFIG_HOME/cleanmbox=]
- CLEANMBOX_RULES_FILE
- rules file [=rules.csv=]
1.3. Rules
Each rule has six ordered space-separated fields. Rule matches if and only if all clauses match
- channel
- clause identifies an email channel (a top-level subdirectory of
MAILDIR
) - from-regex
- clause applies this regex to email
from
header. "-" matches unconditionally - subject-regex
- clause applies this regex to email
subject
header. "-" matches unconditionally - timetolivedays
- clause accepts email with
date
header more than this many days old - disposition
- direct handling of a message matching
channel
,from-regex
,subject-regex
andtimetolivedays
.- keep
- accept message and continue
- move
- move message to folder
dest-folder
- delete
- permanently delete message
- dest-folder
- destination folder when
disposition
ismove
# -------------------------------------------------------------------------------- # columns: # channel from-regex subject-regex timetolivedays disposition dest-folder # # '-' ignores field # # within each channel, rules are evaluated in the order they appear here # -------------------------------------------------------------------------------- # security (alerts + confirmations) gmail noreply@github.com 'your account' 0 move security # travel gmail jetblueairways@info.jetblue.com 'booking confirmation' 0 move travel # junk gmail no-reply@marketing.lyftmail.com - 0 delete hushmail loreleyr0@b.mail-zr.com - 0 delete
1.4. Script
cleanmbox.py
script (written for python 3.10.3):
#!/usr/bin/env python import mailbox import email.utils import email.header import pathlib import datetime import re import csv import base64 import codecs import os import sys from collections import namedtuple # rule fields: ## (not yet) - mbox path (relative to MAILDIR) to mailbox e.g. hushmail for ~/.mail/hushmail # - from_regex (optional) regex for email 'from' address (rule ignores messages that don't match) # - subject_regex (optional) regex to apply to message subject (rule ignores messages that don't match) # - age_days (optional) act when this many days old (rule ignores messages less than this many days old) # - disposition delete|move # - dest_folder (optional) destination folder when .disposition=move e.g. 'travel' for ${mbox}/.travel # Rule = namedtuple('Rule', 'from_regex subject_regex age_days disposition dest_folder') epoch_tm = datetime.datetime.utcfromtimestamp(0) d30_sec = 30 * 24 * 3600 d120_sec = 4 * d30_sec def datetime2float(tm): epoch_sec = (tm - epoch_tm).total_seconds() return epoch_sec def channel_apply_rules(channel, mbox, ruleset, asof_sec, verbose_flag): cut_tm = datetime.datetime.fromtimestamp(asof_sec - d30_sec) max_email = 99999 i_email = 0 # loop over first {max_email} emails in inbox for key in mbox.iterkeys(): i_email += 1 if i_email >= max_email: break try: msg = mbox[key] except email.errors.MessageParseError: continue # ignoring malformed message from_parsed = email.utils.parseaddr(msg['from']) from_displayname = from_parsed[0] from_email = from_parsed[1] msg_tm = email.utils.parsedate_tz(msg['date']) msg_sec = email.utils.mktime_tz(msg_tm) msg_subject = msg['subject'] # deal with utf-8 base64-encoded subjects utf8_prefix_uc = '=?UTF-8?' utf8_prefix_lc = '=?utf-8?' if ((msg_subject[:len(utf8_prefix_lc)] == utf8_prefix_lc) or (msg_subject[:len(utf8_prefix_uc)] == utf8_prefix_uc)): dh = email.header.decode_header(msg_subject) default_charset = 'ASCII' msg_subject = ''.join([ str(t[0], t[1] or default_charset) for t in dh ]) matched_some_rule = False for rule in ruleset: # .from_regex # .subject_regex # .age_days # .disposition # .dest_folder rule_match_flag = True if ((rule.from_regex != None) and not re.search(rule.from_regex, from_email, re.IGNORECASE)): rule_match_flag = False elif ((rule.subject_regex != None) and not re.search(rule.subject_regex, msg_subject, re.IGNORECASE)): rule_match_flag = False elif ((rule.age_days != None) and (msg_sec + 60 >= asof_sec - rule.age_days * 24 * 3600)): rule_match_flag = False if rule_match_flag: matched_some_rule = True if rule_match_flag or verbose_flag: if rule_match_flag: #print (":i {i_email} :key {key} matched rule {rule}".format(i_email=i_email, # key=key, # rule=rule)) print (":channel {channel} :i {i_email} :key {key} :from {from_email} :when {when} :subject [{subject}] -> {disposition} [{dest_folder}]" .format(channel=channel, i_email=i_email, key=key, from_email=from_email, when=msg['date'], subject=msg_subject, disposition=rule.disposition, dest_folder=rule.dest_folder)) else: if False: # super verbose print (":i {i_email} :key {key} :from {from_email} :when {when} :subject [{subject}] -/-> rule {rule}" .format(i_email=i_email, key=key, from_email=from_email, when=msg['date'], subject=msg_subject, rule=rule)) # act on matched rule if rule_match_flag: if rule.disposition == 'keep': # prevents attempting legacy sequence pass elif rule.disposition == 'move': dest_mdir = None if rule.dest_folder: dest_mdir = establish_folder(mbox, rule.dest_folder) else: raise Exception("expected non-null rule.dest_folder with rule.disposition='move'") if dest_mdir == None: raise Exception("failed to establish maildir object [{mdir}] for folder [{folder}]".format(mdir=dest_mdir, folder=rule.dest_folder)) dest_mdir.lock() dest_mdir.add(msg) dest_mdir.flush() dest_mdir.unlock() # also delete message from original location mbox.lock() mbox.discard(key) mbox.flush() mbox.unlock() elif rule.disposition == 'delete': mbox.lock() mbox.discard(key) mbox.flush() mbox.unlock() # deleted message can't be the target of any other rules break if verbose_flag and (not matched_some_rule): print (":channel {channel} :i {i_email} :key {key} :from {from_email} :when {when} :subject [{subject}] -> no rule" .format(channel=channel, i_email=i_email, key=key, from_email=from_email, when=msg['date'], subject=msg_subject, disposition=rule.disposition, dest_folder=rule.dest_folder)) def establish_folder(mbox, folder_name): mdir = None try: mdir = mbox.get_folder(folder_name) except mailbox.NoSuchMailboxError: mdir = mbox.add_folder(folder_name) return mdir # usage: # reader = csv.reader(strip_comments(open('/foo/bar.csv'))) # def strip_comments (ix): for line in ix: if line[:1] == '#': continue # ignore lines beginning with '#' if not line.strip (): continue # ignore blank lines yield line def load_rules(rulesfile): print ("cleanmbox: loading rules from [{rulesfile}]".format(rulesfile=rulesfile)) # dict: channel -> [Rule] allrules_dict = {} # e.g. # gmail no-reply@foo.com - 0 move bar # moves: # - any email with from field matching no-reply@foo.com (treated as regex), # - with any subject ('-' treated as '.*') # - to folder bar # # see [[https://docs.python.org/3/library/csv.html]] # with open(rulesfile, 'r', newline='') as csvrulefile: reader = csv.DictReader(strip_comments(csvrulefile), fieldnames=['channel', 'from_regex', 'subject_regex', 'daystolive', 'disposition', 'dest_folder'], delimiter = ' ', quotechar = '\'', doublequote = False, skipinitialspace = True, quoting = csv.QUOTE_MINIMAL) for row in reader: channel = row['channel'] if not channel in allrules_dict: allrules_dict[channel] = [] # TODO: do something with row['channel'] from_regex = None if row['from_regex'] == '-' else row['from_regex'] subject_regex = None if row['subject_regex'] == '-' else row['subject_regex'] if row['daystolive'] == '-' or row['daystolive'] == None: print(Rule(from_regex, subject_regex, row['daystolive'], row['disposition'], row['dest_folder'])) daystolive = int(row['daystolive']) rule = Rule(from_regex, subject_regex, daystolive, row['disposition'], row['dest_folder']) allrules_dict[channel].append(rule) return allrules_dict def check_rules(allrules_dict): for channel, ruleset in allrules_dict.items(): for rule in ruleset: if not ((rule.disposition == 'keep') or (rule.disposition == 'move') or (rule.disposition == 'delete')): raise Exception("expected disposition keep|move|delete with rule [{rule}]".format(rule=rule)) def print_rules(allrules_dict): for channel, ruleset in allrules_dict.items(): print('rules:') for rule in ruleset: print('channel={channel}, rule={rule}'.format(channel=channel, rule=rule)) def main(): homedir = pathlib.Path.home() asof_sec = datetime2float(datetime.datetime.now()) maildir = None if 'MAILDIR' in os.environ: maildir = os.environ['MAILDIR'] else: maildir = os.path.join(homedir, ".mail") #print("cleanmbox: maildir={maildir}".format(maildir=maildir)) confighomedir = None if 'XDG_CONFIG_HOME' in os.environ: confighomedir = os.environ['XDG_CONFIG_HOME'] else: confighomedir = os.path.join(homedir, ".config") #print("cleanmbox: confighomedir={confighomedir}".format(confighomedir=confighomedir)) configdir = None if 'CLEANMBOX_CONFIG_DIR' in os.environ: configdir = os.environ['CLEANMBOX_CONFIG_DIR'] else: configdir = os.path.join(confighomedir, "cleanmbox") #print("cleanmbox: configdir={configdir}".format(configdir=configdir)) rulesfile = None if 'CLEANMBOX_RULES_FILE' in os.environ: rulesfile = os.environ['CLEANMBOX_RULES_FILE'] else: rulesfile = "rules.csv" #print("cleanmbox: rulesfile={rulesfile}".format(rulesfile=rulesfile)) rulespath = None # 1. check CLEANMBOX_CONFIG_DIR rulespath1 = os.path.join(configdir, rulesfile) if os.path.exists(rulespath1) and os.path.isfile(rulespath1): rulespath = rulespath1 elif os.path.exists(rulesfile) and os.path.isfile(rulesfile): rulespath = rulesfile else: raise Exception("failed to read rules file (searched [{rulespath1}], [{rulespath}]). See XDG_CONFIG_HOME|CLEANMBOX_CONFIG_DIR|CLEANMBOX_RULES_FILE".format(rulespath1=rulespath1, rulespath=rulespath)) # read rules allrules_dict = load_rules(rulespath) # simple sanity checks check_rules(allrules_dict) #print_rules (allrules_dict) for channel, ruleset in allrules_dict.items(): mbox = mailbox.Maildir(os.path.join(maildir, channel)) #print('ruleset={ruleset}'.format(ruleset=ruleset)) channel_apply_rules(channel, mbox, ruleset, asof_sec, verbose_flag=True) main()