UP | HOME

maildir tidying

Table of Contents

1. Introduction

cleanmbox applies stored email pattern-directed rules to mail in local maildir storage. Requires directory tree follow maildir++ structure.

Features:

  • match on regex for sender, subject
  • patterns for email more than n-days old
  • handles utf-8 encoded subject text

Rules supplied from separate text file, default location ~/.config/cleanmbox/rules.csv:

1.1. Usage

$ cleanmbox

apply email handling rules from CLEANMBOX_RULES_FILE to mail in MAILDIR

1.2. Environment Variables

MAILDIR
local mail storage directory [=~/.mail=]
XDG_CONFIG_HOME
root directory for configuration files [=~/.config=]
CLEANMBOX_CONFIG_DIR
directory containing rules file [=XDG_CONFIG_HOME/cleanmbox=]
CLEANMBOX_RULES_FILE
rules file [=rules.csv=]

1.3. Rules

Each rule has six ordered space-separated fields. Rule matches if and only if all clauses match

channel
clause identifies an email channel (a top-level subdirectory of MAILDIR)
from-regex
clause applies this regex to email from header. "-" matches unconditionally
subject-regex
clause applies this regex to email subject header. "-" matches unconditionally
timetolivedays
clause accepts email with date header more than this many days old
disposition
direct handling of a message matching channel, from-regex, subject-regex and timetolivedays.
keep
accept message and continue
move
move message to folder dest-folder
delete
permanently delete message
dest-folder
destination folder when disposition is move
# --------------------------------------------------------------------------------
# columns:
#   channel from-regex subject-regex timetolivedays disposition dest-folder
#
# '-' ignores field
#
# within each channel,  rules are evaluated in the order they appear here
# --------------------------------------------------------------------------------

# security (alerts + confirmations)
gmail noreply@github.com 'your account' 0 move security

# travel
gmail jetblueairways@info.jetblue.com 'booking confirmation' 0 move travel

# junk
gmail no-reply@marketing.lyftmail.com - 0 delete
hushmail loreleyr0@b.mail-zr.com - 0 delete

1.4. Script

cleanmbox.py script (written for python 3.10.3):

#!/usr/bin/env python

import mailbox
import email.utils
import email.header
import pathlib
import datetime
import re
import csv
import base64
import codecs
import os
import sys
from collections import namedtuple

# rule fields:
## (not yet) - mbox           path (relative to MAILDIR) to mailbox  e.g. hushmail for ~/.mail/hushmail
# - from_regex     (optional) regex for email 'from' address (rule ignores messages that don't match)
# - subject_regex  (optional) regex to apply to message subject (rule ignores messages that don't match)
# - age_days       (optional) act when this many days old (rule ignores messages less than this many days old)
# - disposition    delete|move
# - dest_folder    (optional) destination folder when .disposition=move e.g. 'travel' for ${mbox}/.travel
#
Rule = namedtuple('Rule', 'from_regex subject_regex age_days disposition dest_folder')

epoch_tm = datetime.datetime.utcfromtimestamp(0)
d30_sec = 30 * 24 * 3600
d120_sec = 4 * d30_sec

def datetime2float(tm):
    epoch_sec = (tm - epoch_tm).total_seconds()

    return epoch_sec


def channel_apply_rules(channel, mbox, ruleset, asof_sec, verbose_flag):
    cut_tm = datetime.datetime.fromtimestamp(asof_sec - d30_sec)

    max_email = 99999
    i_email = 0

    # loop over first {max_email} emails in inbox
    for key in mbox.iterkeys():
        i_email += 1

        if i_email >= max_email:
            break

        try:
            msg = mbox[key]
        except email.errors.MessageParseError:
            continue  # ignoring malformed message

        from_parsed = email.utils.parseaddr(msg['from'])
        from_displayname = from_parsed[0]
        from_email = from_parsed[1]
        msg_tm = email.utils.parsedate_tz(msg['date'])
        msg_sec = email.utils.mktime_tz(msg_tm)

        msg_subject = msg['subject']

        # deal with utf-8 base64-encoded subjects
        utf8_prefix_uc = '=?UTF-8?'
        utf8_prefix_lc = '=?utf-8?'

        if ((msg_subject[:len(utf8_prefix_lc)] == utf8_prefix_lc)
            or (msg_subject[:len(utf8_prefix_uc)] == utf8_prefix_uc)):

            dh = email.header.decode_header(msg_subject)
            default_charset = 'ASCII'

            msg_subject = ''.join([ str(t[0],
                                        t[1] or default_charset)
                                    for t in dh ])

        matched_some_rule = False

        for rule in ruleset:
            # .from_regex
            # .subject_regex
            # .age_days
            # .disposition
            # .dest_folder

            rule_match_flag = True

            if ((rule.from_regex != None) and not re.search(rule.from_regex, from_email, re.IGNORECASE)):
                rule_match_flag = False
            elif ((rule.subject_regex != None) and not re.search(rule.subject_regex, msg_subject, re.IGNORECASE)):
                rule_match_flag = False
            elif ((rule.age_days != None) and (msg_sec + 60 >= asof_sec - rule.age_days * 24 * 3600)):
                rule_match_flag = False

            if rule_match_flag:
                matched_some_rule = True

            if rule_match_flag or verbose_flag:
                if rule_match_flag:
                    #print (":i {i_email} :key {key} matched rule {rule}".format(i_email=i_email,
                    #                                                            key=key,
                    #                                                            rule=rule))

                    print (":channel {channel} :i {i_email} :key {key} :from {from_email} :when {when} :subject [{subject}] -> {disposition} [{dest_folder}]"
                           .format(channel=channel,
                                   i_email=i_email,
                                   key=key,
                                   from_email=from_email,
                                   when=msg['date'],
                                   subject=msg_subject,
                                   disposition=rule.disposition,
                                   dest_folder=rule.dest_folder))
                else:
                    if False:  # super verbose
                        print (":i {i_email} :key {key} :from {from_email} :when {when} :subject [{subject}] -/-> rule {rule}"
                               .format(i_email=i_email,
                                       key=key,
                                       from_email=from_email,
                                       when=msg['date'],
                                       subject=msg_subject,
                                       rule=rule))


            # act on matched rule
            if rule_match_flag:
                if rule.disposition == 'keep':
                    # prevents attempting legacy sequence
                    pass
                elif rule.disposition == 'move':
                    dest_mdir = None

                    if rule.dest_folder:
                        dest_mdir = establish_folder(mbox, rule.dest_folder)
                    else:
                        raise Exception("expected non-null rule.dest_folder with rule.disposition='move'")

                    if dest_mdir == None:
                        raise Exception("failed to establish maildir object [{mdir}] for folder [{folder}]".format(mdir=dest_mdir, folder=rule.dest_folder))

                    dest_mdir.lock()
                    dest_mdir.add(msg)
                    dest_mdir.flush()
                    dest_mdir.unlock()

                    # also delete message from original location
                    mbox.lock()
                    mbox.discard(key)
                    mbox.flush()
                    mbox.unlock()
                elif rule.disposition == 'delete':
                    mbox.lock()
                    mbox.discard(key)
                    mbox.flush()
                    mbox.unlock()

                    # deleted message can't be the target of any other rules
                    break

        if verbose_flag and (not matched_some_rule):
            print (":channel {channel} :i {i_email} :key {key} :from {from_email} :when {when} :subject [{subject}] -> no rule"
                   .format(channel=channel,
                           i_email=i_email,
                           key=key,
                           from_email=from_email,
                           when=msg['date'],
                           subject=msg_subject,
                           disposition=rule.disposition,
                           dest_folder=rule.dest_folder))


def establish_folder(mbox, folder_name):
    mdir = None
    try:
        mdir = mbox.get_folder(folder_name)
    except mailbox.NoSuchMailboxError:
        mdir = mbox.add_folder(folder_name)

    return mdir

# usage:
#   reader = csv.reader(strip_comments(open('/foo/bar.csv')))
#
def strip_comments (ix):
    for line in ix:
        if line[:1] == '#':
            continue   # ignore lines beginning with '#'
        if not line.strip ():
            continue   # ignore blank lines
        yield line

def load_rules(rulesfile):
    print ("cleanmbox: loading rules from [{rulesfile}]".format(rulesfile=rulesfile))

    # dict: channel -> [Rule]
    allrules_dict = {}

    # e.g.
    #   gmail no-reply@foo.com - 0 move bar
    # moves:
    # - any email with from field matching no-reply@foo.com (treated as regex),
    # - with any subject ('-' treated as '.*')
    # - to folder bar
    #
    # see [[https://docs.python.org/3/library/csv.html]]
    #
    with open(rulesfile, 'r', newline='') as csvrulefile:
        reader = csv.DictReader(strip_comments(csvrulefile),
                                fieldnames=['channel', 'from_regex', 'subject_regex', 'daystolive', 'disposition', 'dest_folder'],
                                delimiter = ' ',
                                quotechar = '\'',
                                doublequote = False,
                                skipinitialspace = True,
                                quoting = csv.QUOTE_MINIMAL)

        for row in reader:
            channel = row['channel']

            if not channel in allrules_dict:
                allrules_dict[channel] = []

            # TODO: do something with row['channel']
            from_regex = None if row['from_regex'] == '-' else row['from_regex']
            subject_regex = None if row['subject_regex'] == '-' else row['subject_regex']

            if row['daystolive'] == '-' or row['daystolive'] == None:
                print(Rule(from_regex, subject_regex, row['daystolive'], row['disposition'], row['dest_folder']))

            daystolive = int(row['daystolive'])

            rule = Rule(from_regex, subject_regex, daystolive, row['disposition'], row['dest_folder'])

            allrules_dict[channel].append(rule)

    return allrules_dict

def check_rules(allrules_dict):
    for channel, ruleset in allrules_dict.items():
        for rule in ruleset:
            if not ((rule.disposition == 'keep')
                    or (rule.disposition == 'move')
                    or (rule.disposition == 'delete')):
                raise Exception("expected disposition keep|move|delete with rule [{rule}]".format(rule=rule))

def print_rules(allrules_dict):
    for channel, ruleset in allrules_dict.items():
        print('rules:')
        for rule in ruleset:
            print('channel={channel}, rule={rule}'.format(channel=channel, rule=rule))

def main():
    homedir = pathlib.Path.home()
    asof_sec = datetime2float(datetime.datetime.now())

    maildir = None

    if 'MAILDIR' in os.environ:
        maildir = os.environ['MAILDIR']
    else:
        maildir = os.path.join(homedir, ".mail")

    #print("cleanmbox: maildir={maildir}".format(maildir=maildir))

    confighomedir = None

    if 'XDG_CONFIG_HOME' in os.environ:
        confighomedir = os.environ['XDG_CONFIG_HOME']
    else:
        confighomedir = os.path.join(homedir, ".config")

    #print("cleanmbox: confighomedir={confighomedir}".format(confighomedir=confighomedir))

    configdir = None

    if 'CLEANMBOX_CONFIG_DIR' in os.environ:
        configdir = os.environ['CLEANMBOX_CONFIG_DIR']
    else:
        configdir = os.path.join(confighomedir, "cleanmbox")

    #print("cleanmbox: configdir={configdir}".format(configdir=configdir))

    rulesfile = None

    if 'CLEANMBOX_RULES_FILE' in os.environ:
        rulesfile = os.environ['CLEANMBOX_RULES_FILE']
    else:
        rulesfile = "rules.csv"

    #print("cleanmbox: rulesfile={rulesfile}".format(rulesfile=rulesfile))

    rulespath = None

    # 1. check CLEANMBOX_CONFIG_DIR
    rulespath1 = os.path.join(configdir, rulesfile)

    if os.path.exists(rulespath1) and os.path.isfile(rulespath1):
        rulespath = rulespath1
    elif os.path.exists(rulesfile) and os.path.isfile(rulesfile):
        rulespath = rulesfile
    else:
        raise Exception("failed to read rules file (searched [{rulespath1}], [{rulespath}]). See XDG_CONFIG_HOME|CLEANMBOX_CONFIG_DIR|CLEANMBOX_RULES_FILE".format(rulespath1=rulespath1, rulespath=rulespath))

    # read rules
    allrules_dict = load_rules(rulespath)

    # simple sanity checks
    check_rules(allrules_dict)

    #print_rules (allrules_dict)

    for channel, ruleset in allrules_dict.items():
        mbox = mailbox.Maildir(os.path.join(maildir, channel))

        #print('ruleset={ruleset}'.format(ruleset=ruleset))

        channel_apply_rules(channel,
                            mbox,
                            ruleset,
                            asof_sec,
                            verbose_flag=True)

main()

Author: Roland Conybeare

Created: 2024-09-08 Sun 18:01

Validate