# 离线版本

# 一. 使用说明

# 1.1 数据说明

# 1.1.1 文件格式

文件类型 包格式
每日全量包 .tar.gz
每日更新包 .tar.gz
分钟级更新包 .tar.gz

# 1.1.2 压缩包解压内容

初始全量包:初始全量包解压后共1个文件:20220808.csv(邮箱后缀、完整邮箱)/20220808.txt(邮箱mx)

每日更新包:每日更新包解压后共1个文件:20220808.csv(邮箱后缀、完整邮箱)/20220808.txt(邮箱mx)

分钟级更新包:分钟级更新包解压后共1个文件:202208080000.csv(邮箱后缀、完整邮箱)/202208080000.txt(邮箱mx)

# 1.1.3 文件格式

csv文件分隔符采用制表符(\t)。

txt文件采用json格式。

# 1.1.4 接口白名单

为确保邮箱风险画像服务正常使用,请将以下接口列入公司域名白名单中。

名称 内容
api接口域名 https://black-mailbox.yazx.com
oss下载链接域名 http://black-mailbox-package.oss-cn-shenzhen.aliyuncs.com

1.1.5 使用建议

客户拉取全量包入库后,可以根据实际情况,自行选择日更包下载服务或者分钟包下载服务

# 1.2 邮箱后缀数据

1.2.1文件字段说明

全量包、日更包、分钟包文件格式相同

字段 字段类型 字段名称
email_suffix str 邮箱后缀
type int

邮箱后缀类型:
0 未知邮箱:未能识别该邮箱类型。
1 公共邮箱:指在邮箱门户网站注册的邮箱,使用者多为正常用户。
2 临时邮箱:指无需注册登录且只有几分钟到几小时不等有效性即可收发邮件的邮箱,使用者多为恶意用户。
3 企业邮箱:指通过企业认证的邮箱 ,使用者为企业在职员工。
4 校园邮箱:指通过校园认证的邮箱 ,使用者为学校的校园师生。
5 无效邮箱:指不能收发邮件的邮箱。
6 自建邮箱:指私人搭建邮件服务器,所有者可以随意增加或更换邮箱账号。

update_time datetime 更新时间
is_deleted bool

是否删除
当is_deleted为1时,需把此条数据从数据库中删除
当is_deleted为0时,更新此条数据

1.2.2 全量包下载服务

名称 内容
功能描述 获取邮箱后缀全量包
请求地址 https://black-mailbox.yazx.com/v2/offline/api/suffix/initial/current-download-url (opens new window)
类型 POST
请求格式 JSON

请求体

参数名称 参数类型 是否必传 参数说明
snuser string 用户识别id(配置管理页面中可以查看)
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 是否必传 参数说明
current_version int 需要下载的版本,格式为%Y%m%d,exm:20220808

响应体

参数名称 参数类型 参数说明
status int 状态码
errmsg str 对应状态码返回提示信息
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 参数说明
current_version int 请求体中的current_version
current_download_url str 全量包的oss下载链接

代码示例

# -*- encoding: utf-8 -*-

"""
python 版本
# 3.7及以上

# 依赖包
pip install pycryptodome
pip install requests

"""
import base64

import requests
import json
from Crypto.Cipher import AES
from Crypto import Random

SNUSER: str = "test"
SNKEY: str = "test"
HOST: str = "https://black-mailbox.yazx.com"


def aes_encrypt_seg(encrypt_str: str, cecret: str) -> str:
    remainder = len(encrypt_str) % AES.block_size
    if remainder:
        padded_value = encrypt_str + '\0' * (AES.block_size - remainder)
    else:
        padded_value = encrypt_str
    # a random 16 byte key
    iv = Random.new().read(AES.block_size)
    # CFB mode
    cipher = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, iv, segment_size=128)
    # drop the padded value(phone number length is short the 16bytes)
    value = cipher.encrypt(bytes(padded_value, encoding="utf8")[:len(encrypt_str)])
    ciphertext = iv + value
    return str(base64.encodebytes(ciphertext).strip(), encoding="utf8")


def aes_decrypt_seg(encrypt_str: str, cecret: str) -> str:
    data = base64.decodebytes(bytes(encrypt_str, encoding="utf8"))
    cihpertxt = data[AES.block_size:]
    remainder = len(cihpertxt) % AES.block_size
    if remainder:
        padded_value = cihpertxt + b'\0' * (AES.block_size - remainder)
    else:
        padded_value = cihpertxt
    cryptor = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, data[0:AES.block_size], segment_size=128)
    plain_text = cryptor.decrypt(padded_value)
    return str(plain_text[0:len(cihpertxt)], encoding="utf8")


def get_suffix_initial():
    POST_URL: str = f"{HOST}/v2/offline/api/suffix/initial/current-download-url"
    headers: dict = {"content-type": "application/json"}
    # 通过snkey加密data
    payload: dict = {
        "current_version": 20220808,
    }
    data: str = aes_encrypt_seg(json.dumps(payload), SNKEY)

    r = requests.post(url=POST_URL, headers=headers, json={"snuser": SNUSER, "data": data})
    rjson: dict = r.json()
    print(rjson)
    if rjson["status"] != 200:
        print(rjson['errmsg'])
    else:
        print(aes_decrypt_seg(rjson["data"], SNKEY))


if __name__ == "__main__":
    get_suffix_initial()


# 响应体示例
# {
#    'errmsg': 'ok',
#    'status': 200,
#    'data': 'G4EstRHHcsolCtnUvHBwLWDb0Rw94qtctyQDxFJSiSX5hPkisZ4dA+xJ1obyn3Tjzqqz4jy2pssRuVAabqT+mLgaGPtZD5qW2747M551ICvOcPUGcw5+yaeGkY6HS60hBWPf7TgQ9mjE6o/iX0NbuIXnG1tyi8r3BQJoJkTJif6eBsVdpVyaKH0x92MwFcn8DP5lCasQ0RQ376rgxiKSEXLH8e3mNNhFJanrphuGVil7hUhCX/DVitjKdZSyJ/svsxsUG49s8txrX8G0KzEfctXNTQLa7IeNKCORFeh9GFWp7RKJ23IpZo+8V4WMplJ5eh+q7XaQgYe3uAeqC53AfBcwlCiHjBtVM3TqAwM='
# }
# 响应体data示例
# {
#    'current_version': 20220808,
#    'current_download_url': 'http://black-mailbox-package.oss-cn-shenzhen.aliyuncs.com/xxxxxxxxxxxxxxxx'
# }

1.2.3 日更包下载服务

名称 内容
功能描述 获取邮箱后缀日更包
请求地址 https://black-mailbox.yazx.com/v2/offline/api/suffix/day/current-download-url (opens new window)
类型 POST
请求格式 JSON

请求体

参数名称 参数类型 是否必传 参数说明
snuser string 用户识别id(配置管理页面中可以查看)
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 是否必传 参数说明
current_version int 需要下载的版本,格式为%Y%m%d,exm:20220808

响应体

参数名称 参数类型 参数说明
status int 状态码
errmsg str 对应状态码返回提示信息
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 参数说明
current_version int 请求体中的current_version
current_download_url str 日更包的oss下载链接

代码示例

# -*- encoding: utf-8 -*-

"""
python 版本
# 3.7及以上

# 依赖包
pip install pycryptodome
pip install requests

"""
import base64

import requests
import json
from Crypto.Cipher import AES
from Crypto import Random

SNUSER: str = "test"
SNKEY: str = "test"
HOST: str = "https://black-mailbox.yazx.com"


def aes_encrypt_seg(encrypt_str: str, cecret: str) -> str:
    remainder = len(encrypt_str) % AES.block_size
    if remainder:
        padded_value = encrypt_str + '\0' * (AES.block_size - remainder)
    else:
        padded_value = encrypt_str
    # a random 16 byte key
    iv = Random.new().read(AES.block_size)
    # CFB mode
    cipher = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, iv, segment_size=128)
    # drop the padded value(phone number length is short the 16bytes)
    value = cipher.encrypt(bytes(padded_value, encoding="utf8")[:len(encrypt_str)])
    ciphertext = iv + value
    return str(base64.encodebytes(ciphertext).strip(), encoding="utf8")


def aes_decrypt_seg(encrypt_str: str, cecret: str) -> str:
    data = base64.decodebytes(bytes(encrypt_str, encoding="utf8"))
    cihpertxt = data[AES.block_size:]
    remainder = len(cihpertxt) % AES.block_size
    if remainder:
        padded_value = cihpertxt + b'\0' * (AES.block_size - remainder)
    else:
        padded_value = cihpertxt
    cryptor = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, data[0:AES.block_size], segment_size=128)
    plain_text = cryptor.decrypt(padded_value)
    return str(plain_text[0:len(cihpertxt)], encoding="utf8")


def get_suffix_day():
    POST_URL: str = f"{HOST}/v2/offline/api/suffix/day/current-download-url"
    headers: dict = {"content-type": "application/json"}
    # 通过snkey加密data
    payload: dict = {
        "current_version": 20220808,
    }
    data: str = aes_encrypt_seg(json.dumps(payload), SNKEY)

    r = requests.post(url=POST_URL, headers=headers, json={"snuser": SNUSER, "data": data})
    rjson: dict = r.json()
    print(rjson)
    if rjson["status"] != 200:
        print(rjson['errmsg'])
    else:
        print(aes_decrypt_seg(rjson["data"], SNKEY))


if __name__ == "__main__":
    get_suffix_day()


# 响应体示例
# {
#    'errmsg': 'ok',
#    'status': 200,
#    'data': 'G4EstRHHcsolCtnUvHBwLWDb0Rw94qtctyQDxFJSiSX5hPkisZ4dA+xJ1obyn3Tjzqqz4jy2pssRuVAabqT+mLgaGPtZD5qW2747M551ICvOcPUGcw5+yaeGkY6HS60hBWPf7TgQ9mjE6o/iX0NbuIXnG1tyi8r3BQJoJkTJif6eBsVdpVyaKH0x92MwFcn8DP5lCasQ0RQ376rgxiKSEXLH8e3mNNhFJanrphuGVil7hUhCX/DVitjKdZSyJ/svsxsUG49s8txrX8G0KzEfctXNTQLa7IeNKCORFeh9GFWp7RKJ23IpZo+8V4WMplJ5eh+q7XaQgYe3uAeqC53AfBcwlCiHjBtVM3TqAwM='
# }
# 响应体data示例
# {
#    'current_version': 20220808,
#    'current_download_url': 'http://black-mailbox-package.oss-cn-shenzhen.aliyuncs.com/xxxxxxxxxxxxxxxx'
# }

1.2.4 分钟包下载服务

名称 内容
功能描述 获取邮箱后缀分钟更新包
请求地址 https://black-mailbox.yazx.com/v2/offline/api/suffix/min/next-download-url (opens new window)
类型 POST
请求格式 JSON

请求体

参数名称 参数类型 是否必传 参数说明
snuser string 用户识别id(配置管理页面中可以查看)
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 是否必传 参数说明
current_version int 当前已更新版本,格式为%Y%m%d%H%M, exm:202208080000

响应体

参数名称 参数类型 参数说明
status int 状态码
errmsg str 对应状态码返回提示信息
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 参数说明
current_version int 请求体中的current_version
next_version int 下一个版本
next_download_url str 下一个版本号的分钟包的oss下载链接

代码示例

# -*- encoding: utf-8 -*-

"""
python 版本
# 3.7及以上

# 依赖包
pip install pycryptodome
pip install requests

"""
import base64

import requests
import json
from Crypto.Cipher import AES
from Crypto import Random

SNUSER: str = "test"
SNKEY: str = "test"
HOST: str = "https://black-mailbox.yazx.com"


def aes_encrypt_seg(encrypt_str: str, cecret: str) -> str:
    remainder = len(encrypt_str) % AES.block_size
    if remainder:
        padded_value = encrypt_str + '\0' * (AES.block_size - remainder)
    else:
        padded_value = encrypt_str
    # a random 16 byte key
    iv = Random.new().read(AES.block_size)
    # CFB mode
    cipher = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, iv, segment_size=128)
    # drop the padded value(phone number length is short the 16bytes)
    value = cipher.encrypt(bytes(padded_value, encoding="utf8")[:len(encrypt_str)])
    ciphertext = iv + value
    return str(base64.encodebytes(ciphertext).strip(), encoding="utf8")


def aes_decrypt_seg(encrypt_str: str, cecret: str) -> str:
    data = base64.decodebytes(bytes(encrypt_str, encoding="utf8"))
    cihpertxt = data[AES.block_size:]
    remainder = len(cihpertxt) % AES.block_size
    if remainder:
        padded_value = cihpertxt + b'\0' * (AES.block_size - remainder)
    else:
        padded_value = cihpertxt
    cryptor = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, data[0:AES.block_size], segment_size=128)
    plain_text = cryptor.decrypt(padded_value)
    return str(plain_text[0:len(cihpertxt)], encoding="utf8")


def get_suffix_min():
    POST_URL: str = f"{HOST}/v2/offline/api/suffix/min/next-download-url"
    headers: dict = {"content-type": "application/json"}
    # 通过snkey加密data
    payload: dict = {
        "current_version": 202208080000,
    }
    data: str = aes_encrypt_seg(json.dumps(payload), SNKEY)

    r = requests.post(url=POST_URL, headers=headers, json={"snuser": SNUSER, "data": data})
    rjson: dict = r.json()
    print(rjson)
    if rjson["status"] != 200:
        print(rjson['errmsg'])
    else:
        print(aes_decrypt_seg(rjson["data"], SNKEY))


if __name__ == "__main__":
    get_suffix_min()


# 响应体示例
# {
#    'errmsg': 'ok',
#    'status': 200,
#    'data': 'G4EstRHHcsolCtnUvHBwLWDb0Rw94qtctyQDxFJSiSX5hPkisZ4dA+xJ1obyn3Tjzqqz4jy2pssRuVAabqT+mLgaGPtZD5qW2747M551ICvOcPUGcw5+yaeGkY6HS60hBWPf7TgQ9mjE6o/iX0NbuIXnG1tyi8r3BQJoJkTJif6eBsVdpVyaKH0x92MwFcn8DP5lCasQ0RQ376rgxiKSEXLH8e3mNNhFJanrphuGVil7hUhCX/DVitjKdZSyJ/svsxsUG49s8txrX8G0KzEfctXNTQLa7IeNKCORFeh9GFWp7RKJ23IpZo+8V4WMplJ5eh+q7XaQgYe3uAeqC53AfBcwlCiHjBtVM3TqAwM='
# }
# 响应体data示例
# {
#    'current_version': 202208080000,
#    'next_version': 202208080001,
#    'next_download_url': 'http://black-mailbox-package.oss-cn-shenzhen.aliyuncs.com/xxxxxxxxxxxxxxxx'
# }

# 1.3 完整邮箱黑名单数据

# 1.3.1字段说明

初始包、日更包、分钟包文件格式相同

字段 字段类型 字段名称
email_prefix str 邮箱前缀
email_suffix str 邮箱后缀
update_time datetime 更新时间
is_deleted bool

是否删除
当is_deleted为1时,需把此条数据从数据库中删除
当is_deleted为0时,更新此条数据

# 1.3.2全量包下载服务

名称 内容
功能描述 获取完整邮箱黑名单全量包
请求地址 https://black-mailbox.yazx.com/v2/offline/api/full/initial/current-download-url (opens new window)
类型 POST
请求格式 JSON

请求体

参数名称 参数类型 是否必传 参数说明
snuser string 用户识别id(配置管理页面中可以查看)
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 是否必传 参数说明
current_version int 需要下载的版本,格式为%Y%m%d,exm:20220808

响应体

参数名称 参数类型 参数说明
status int 状态码
errmsg str 对应状态码返回提示信息
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 参数说明
current_version int 请求体中的current_version
current_download_url str 全量包的oss下载链接

代码示例

# -*- encoding: utf-8 -*-

"""
python 版本
# 3.7及以上

# 依赖包
pip install pycryptodome
pip install requests

"""
import base64

import requests
import json
from Crypto.Cipher import AES
from Crypto import Random

SNUSER: str = "test"
SNKEY: str = "test"
HOST: str = "https://black-mailbox.yazx.com"


def aes_encrypt_seg(encrypt_str: str, cecret: str) -> str:
    remainder = len(encrypt_str) % AES.block_size
    if remainder:
        padded_value = encrypt_str + '\0' * (AES.block_size - remainder)
    else:
        padded_value = encrypt_str
    # a random 16 byte key
    iv = Random.new().read(AES.block_size)
    # CFB mode
    cipher = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, iv, segment_size=128)
    # drop the padded value(phone number length is short the 16bytes)
    value = cipher.encrypt(bytes(padded_value, encoding="utf8")[:len(encrypt_str)])
    ciphertext = iv + value
    return str(base64.encodebytes(ciphertext).strip(), encoding="utf8")


def aes_decrypt_seg(encrypt_str: str, cecret: str) -> str:
    data = base64.decodebytes(bytes(encrypt_str, encoding="utf8"))
    cihpertxt = data[AES.block_size:]
    remainder = len(cihpertxt) % AES.block_size
    if remainder:
        padded_value = cihpertxt + b'\0' * (AES.block_size - remainder)
    else:
        padded_value = cihpertxt
    cryptor = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, data[0:AES.block_size], segment_size=128)
    plain_text = cryptor.decrypt(padded_value)
    return str(plain_text[0:len(cihpertxt)], encoding="utf8")


def get_full_initial():
    POST_URL: str = f"{HOST}/v2/offline/api/full/initial/current-download-url"
    headers: dict = {"content-type": "application/json"}
    # 通过snkey加密data
    payload: dict = {
        "current_version": 20220808,
    }
    data: str = aes_encrypt_seg(json.dumps(payload), SNKEY)

    r = requests.post(url=POST_URL, headers=headers, json={"snuser": SNUSER, "data": data})
    rjson: dict = r.json()
    print(rjson)
    if rjson["status"] != 200:
        print(rjson['errmsg'])
    else:
        print(aes_decrypt_seg(rjson["data"], SNKEY))


if __name__ == "__main__":
    get_full_initial()


# 响应体示例
# {
#    'errmsg': 'ok',
#    'status': 200,
#    'data': 'G4EstRHHcsolCtnUvHBwLWDb0Rw94qtctyQDxFJSiSX5hPkisZ4dA+xJ1obyn3Tjzqqz4jy2pssRuVAabqT+mLgaGPtZD5qW2747M551ICvOcPUGcw5+yaeGkY6HS60hBWPf7TgQ9mjE6o/iX0NbuIXnG1tyi8r3BQJoJkTJif6eBsVdpVyaKH0x92MwFcn8DP5lCasQ0RQ376rgxiKSEXLH8e3mNNhFJanrphuGVil7hUhCX/DVitjKdZSyJ/svsxsUG49s8txrX8G0KzEfctXNTQLa7IeNKCORFeh9GFWp7RKJ23IpZo+8V4WMplJ5eh+q7XaQgYe3uAeqC53AfBcwlCiHjBtVM3TqAwM='
# }
# 响应体data示例
# {
#    'current_version': 20220808,
#    'current_download_url': 'http://black-mailbox-package.oss-cn-shenzhen.aliyuncs.com/xxxxxxxxxxxxxxxx'
# }

# 1.3.3 日更包下载服务

名称 内容
功能描述 获取完整邮箱黑名单日更新包
请求地址 https://black-mailbox.yazx.com/v2/offline/api/full/day/current-download-url (opens new window)
类型 POST
请求格式 JSON

请求体

参数名称 参数类型 是否必传 参数说明
snuser string 用户识别id(配置管理页面中可以查看)
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 是否必传 参数说明
current_version int 需要下载的版本,格式为%Y%m%d,exm:20220808

响应体

参数名称 参数类型 参数说明
status int 状态码
errmsg str 对应状态码返回提示信息
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 参数说明
current_version int 请求体中的current_version
current_download_url str 日更包的oss下载链接

代码示例

# -*- encoding: utf-8 -*-

"""
python 版本
# 3.7及以上

# 依赖包
pip install pycryptodome
pip install requests

"""
import base64

import requests
import json
from Crypto.Cipher import AES
from Crypto import Random

SNUSER: str = "test"
SNKEY: str = "test"
HOST: str = "https://black-mailbox.yazx.com"


def aes_encrypt_seg(encrypt_str: str, cecret: str) -> str:
    remainder = len(encrypt_str) % AES.block_size
    if remainder:
        padded_value = encrypt_str + '\0' * (AES.block_size - remainder)
    else:
        padded_value = encrypt_str
    # a random 16 byte key
    iv = Random.new().read(AES.block_size)
    # CFB mode
    cipher = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, iv, segment_size=128)
    # drop the padded value(phone number length is short the 16bytes)
    value = cipher.encrypt(bytes(padded_value, encoding="utf8")[:len(encrypt_str)])
    ciphertext = iv + value
    return str(base64.encodebytes(ciphertext).strip(), encoding="utf8")


def aes_decrypt_seg(encrypt_str: str, cecret: str) -> str:
    data = base64.decodebytes(bytes(encrypt_str, encoding="utf8"))
    cihpertxt = data[AES.block_size:]
    remainder = len(cihpertxt) % AES.block_size
    if remainder:
        padded_value = cihpertxt + b'\0' * (AES.block_size - remainder)
    else:
        padded_value = cihpertxt
    cryptor = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, data[0:AES.block_size], segment_size=128)
    plain_text = cryptor.decrypt(padded_value)
    return str(plain_text[0:len(cihpertxt)], encoding="utf8")


def get_full_day():
    POST_URL: str = f"{HOST}/v2/offline/api/full/day/current-download-url"
    headers: dict = {"content-type": "application/json"}
    # 通过snkey加密data
    payload: dict = {
        "current_version": 20220808,
    }
    data: str = aes_encrypt_seg(json.dumps(payload), SNKEY)

    r = requests.post(url=POST_URL, headers=headers, json={"snuser": SNUSER, "data": data})
    rjson: dict = r.json()
    print(rjson)
    if rjson["status"] != 200:
        print(rjson['errmsg'])
    else:
        print(aes_decrypt_seg(rjson["data"], SNKEY))


if __name__ == "__main__":
    get_full_day()


# 响应体示例
# {
#    'errmsg': 'ok',
#    'status': 200,
#    'data': 'G4EstRHHcsolCtnUvHBwLWDb0Rw94qtctyQDxFJSiSX5hPkisZ4dA+xJ1obyn3Tjzqqz4jy2pssRuVAabqT+mLgaGPtZD5qW2747M551ICvOcPUGcw5+yaeGkY6HS60hBWPf7TgQ9mjE6o/iX0NbuIXnG1tyi8r3BQJoJkTJif6eBsVdpVyaKH0x92MwFcn8DP5lCasQ0RQ376rgxiKSEXLH8e3mNNhFJanrphuGVil7hUhCX/DVitjKdZSyJ/svsxsUG49s8txrX8G0KzEfctXNTQLa7IeNKCORFeh9GFWp7RKJ23IpZo+8V4WMplJ5eh+q7XaQgYe3uAeqC53AfBcwlCiHjBtVM3TqAwM='
# }
# 响应体data示例
# {
#    'current_version': 20220808,
#    'current_download_url': 'http://black-mailbox-package.oss-cn-shenzhen.aliyuncs.com/xxxxxxxxxxxxxxxx'
# }

# 1.3.4分钟包下载服务

名称 内容
功能描述 获取完整邮箱黑名单分钟更新包
请求地址 https://black-mailbox.yazx.com/v2/offline/api/full/min/next-download-url (opens new window)
类型 POST
请求格式 JSON

请求体

参数名称 参数类型 是否必传 参数说明
snuser string 用户识别id(配置管理页面中可以查看)
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 是否必传 参数说明
current_version int 当前已更新版本,格式为%Y%m%d%H%M, exm:202208080000

响应体

参数名称 参数类型 参数说明
status int 状态码
errmsg str 对应状态码返回提示信息
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 参数说明
current_version int 请求体中的current_version
next_version int 下一个版本
next_download_url str 下一个版本号的分钟包的oss下载链接

代码示例

# -*- encoding: utf-8 -*-

"""
python 版本
# 3.7及以上

# 依赖包
pip install pycryptodome
pip install requests

"""
import base64

import requests
import json
from Crypto.Cipher import AES
from Crypto import Random

SNUSER: str = "test"
SNKEY: str = "test"
HOST: str = "https://black-mailbox.yazx.com"


def aes_encrypt_seg(encrypt_str: str, cecret: str) -> str:
    remainder = len(encrypt_str) % AES.block_size
    if remainder:
        padded_value = encrypt_str + '\0' * (AES.block_size - remainder)
    else:
        padded_value = encrypt_str
    # a random 16 byte key
    iv = Random.new().read(AES.block_size)
    # CFB mode
    cipher = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, iv, segment_size=128)
    # drop the padded value(phone number length is short the 16bytes)
    value = cipher.encrypt(bytes(padded_value, encoding="utf8")[:len(encrypt_str)])
    ciphertext = iv + value
    return str(base64.encodebytes(ciphertext).strip(), encoding="utf8")


def aes_decrypt_seg(encrypt_str: str, cecret: str) -> str:
    data = base64.decodebytes(bytes(encrypt_str, encoding="utf8"))
    cihpertxt = data[AES.block_size:]
    remainder = len(cihpertxt) % AES.block_size
    if remainder:
        padded_value = cihpertxt + b'\0' * (AES.block_size - remainder)
    else:
        padded_value = cihpertxt
    cryptor = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, data[0:AES.block_size], segment_size=128)
    plain_text = cryptor.decrypt(padded_value)
    return str(plain_text[0:len(cihpertxt)], encoding="utf8")


def get_full_min():
    POST_URL: str = f"{HOST}/v2/offline/api/full/min/next-download-url"
    headers: dict = {"content-type": "application/json"}
    # 通过snkey加密data
    payload: dict = {
        "current_version": 202208080000,
    }
    data: str = aes_encrypt_seg(json.dumps(payload), SNKEY)

    r = requests.post(url=POST_URL, headers=headers, json={"snuser": SNUSER, "data": data})
    rjson: dict = r.json()
    print(rjson)
    if rjson["status"] != 200:
        print(rjson['errmsg'])
    else:
        print(aes_decrypt_seg(rjson["data"], SNKEY))


if __name__ == "__main__":
    get_full_min()


# 响应体示例
# {
#    'errmsg': 'ok',
#    'status': 200,
#    'data': 'G4EstRHHcsolCtnUvHBwLWDb0Rw94qtctyQDxFJSiSX5hPkisZ4dA+xJ1obyn3Tjzqqz4jy2pssRuVAabqT+mLgaGPtZD5qW2747M551ICvOcPUGcw5+yaeGkY6HS60hBWPf7TgQ9mjE6o/iX0NbuIXnG1tyi8r3BQJoJkTJif6eBsVdpVyaKH0x92MwFcn8DP5lCasQ0RQ376rgxiKSEXLH8e3mNNhFJanrphuGVil7hUhCX/DVitjKdZSyJ/svsxsUG49s8txrX8G0KzEfctXNTQLa7IeNKCORFeh9GFWp7RKJ23IpZo+8V4WMplJ5eh+q7XaQgYe3uAeqC53AfBcwlCiHjBtVM3TqAwM='
# }
# 响应体data示例
# {
#    'current_version': 202208080000,
#    'next_version': 202208080001,
#    'next_download_url': 'http://black-mailbox-package.oss-cn-shenzhen.aliyuncs.com/xxxxxxxxxxxxxxxx'
# }

# 1.4 邮箱MX数据

1.4.1文件字段说明

全量包、日更包、分钟包文件格式相同

字段 字段类型 字段名称
mx str 邮箱的mx记录
mx_a list[str] mx的a记录
mx_type

邮箱mx类型
0 未知mx:未能识别该mx类型。
1 公共mx:指在邮箱门户网站注册的邮箱的mx,使用者多为正常用户。
2 临时mx:指无需注册登录且只有几分钟到几小时不等有效性即可收发邮件的邮箱的mx,使用者多为恶意用户。
3 企业mx:指通过企业认证的邮箱的mx ,使用者为企业在职员工。
6 自建mx:指私人搭建邮件服务器,所有者可以随意增加或更换邮箱账号。

update_time datetime 更新时间
is_deleted bool

是否删除
当is_deleted为1时,需把此条数据从数据库中删除
当is_deleted为0时,更新此条数据

1.2.2 全量包下载服务

名称 内容
功能描述 获取邮箱mx全量包
请求地址 https://black-mailbox.yazx.com/v2/offline/api/mx/initial/current-download-url (opens new window)
类型 POST
请求格式 JSON

请求体

参数名称 参数类型 是否必传 参数说明
snuser string 用户识别id(配置管理页面中可以查看)
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 是否必传 参数说明
current_version int 需要下载的版本,格式为%Y%m%d,exm:20220808

响应体

参数名称 参数类型 参数说明
status int 状态码
errmsg str 对应状态码返回提示信息
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 参数说明
current_version int 请求体中的current_version
current_download_url str 全量包的oss下载链接

代码示例

# -*- encoding: utf-8 -*-

"""
python 版本
# 3.7及以上

# 依赖包
pip install pycryptodome
pip install requests

"""
import base64

import requests
import json
from Crypto.Cipher import AES
from Crypto import Random

SNUSER: str = "test"
SNKEY: str = "test"
HOST: str = "https://black-mailbox.yazx.com"


def aes_encrypt_seg(encrypt_str: str, cecret: str) -> str:
    remainder = len(encrypt_str) % AES.block_size
    if remainder:
        padded_value = encrypt_str + '\0' * (AES.block_size - remainder)
    else:
        padded_value = encrypt_str
    # a random 16 byte key
    iv = Random.new().read(AES.block_size)
    # CFB mode
    cipher = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, iv, segment_size=128)
    # drop the padded value(phone number length is short the 16bytes)
    value = cipher.encrypt(bytes(padded_value, encoding="utf8")[:len(encrypt_str)])
    ciphertext = iv + value
    return str(base64.encodebytes(ciphertext).strip(), encoding="utf8")


def aes_decrypt_seg(encrypt_str: str, cecret: str) -> str:
    data = base64.decodebytes(bytes(encrypt_str, encoding="utf8"))
    cihpertxt = data[AES.block_size:]
    remainder = len(cihpertxt) % AES.block_size
    if remainder:
        padded_value = cihpertxt + b'\0' * (AES.block_size - remainder)
    else:
        padded_value = cihpertxt
    cryptor = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, data[0:AES.block_size], segment_size=128)
    plain_text = cryptor.decrypt(padded_value)
    return str(plain_text[0:len(cihpertxt)], encoding="utf8")


def get_mx_initial():
    POST_URL: str = f"{HOST}/v2/offline/api/mx/initial/current-download-url"
    headers: dict = {"content-type": "application/json"}
    # 通过snkey加密data
    payload: dict = {
        "current_version": 20220808,
    }
    data: str = aes_encrypt_seg(json.dumps(payload), SNKEY)

    r = requests.post(url=POST_URL, headers=headers, json={"snuser": SNUSER, "data": data})
    rjson: dict = r.json()
    print(rjson)
    if rjson["status"] != 200:
        print(rjson['errmsg'])
    else:
        print(aes_decrypt_seg(rjson["data"], SNKEY))


if __name__ == "__main__":
    get_mx_initial()


# 响应体示例
# {
#    'errmsg': 'ok',
#    'status': 200,
#    'data': 'G4EstRHHcsolCtnUvHBwLWDb0Rw94qtctyQDxFJSiSX5hPkisZ4dA+xJ1obyn3Tjzqqz4jy2pssRuVAabqT+mLgaGPtZD5qW2747M551ICvOcPUGcw5+yaeGkY6HS60hBWPf7TgQ9mjE6o/iX0NbuIXnG1tyi8r3BQJoJkTJif6eBsVdpVyaKH0x92MwFcn8DP5lCasQ0RQ376rgxiKSEXLH8e3mNNhFJanrphuGVil7hUhCX/DVitjKdZSyJ/svsxsUG49s8txrX8G0KzEfctXNTQLa7IeNKCORFeh9GFWp7RKJ23IpZo+8V4WMplJ5eh+q7XaQgYe3uAeqC53AfBcwlCiHjBtVM3TqAwM='
# }
# 响应体data示例
# {
#    'current_version': 20220808,
#    'current_download_url': 'http://black-mailbox-package.oss-cn-shenzhen.aliyuncs.com/xxxxxxxxxxxxxxxx'
# }

1.2.3 日更包下载服务

名称 内容
功能描述 获取邮箱mx日更包
请求地址 https://black-mailbox.yazx.com/v2/offline/api/mx/day/current-download-url (opens new window)
类型 POST
请求格式 JSON

请求体

参数名称 参数类型 是否必传 参数说明
snuser string 用户识别id(配置管理页面中可以查看)
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 是否必传 参数说明
current_version int 需要下载的版本,格式为%Y%m%d,exm:20220808

响应体

参数名称 参数类型 参数说明
status int 状态码
errmsg str 对应状态码返回提示信息
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 参数说明
current_version int 请求体中的current_version
current_download_url str 日更包的oss下载链接

代码示例

# -*- encoding: utf-8 -*-

"""
python 版本
# 3.7及以上

# 依赖包
pip install pycryptodome
pip install requests

"""
import base64

import requests
import json
from Crypto.Cipher import AES
from Crypto import Random

SNUSER: str = "test"
SNKEY: str = "test"
HOST: str = "https://black-mailbox.yazx.com"


def aes_encrypt_seg(encrypt_str: str, cecret: str) -> str:
    remainder = len(encrypt_str) % AES.block_size
    if remainder:
        padded_value = encrypt_str + '\0' * (AES.block_size - remainder)
    else:
        padded_value = encrypt_str
    # a random 16 byte key
    iv = Random.new().read(AES.block_size)
    # CFB mode
    cipher = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, iv, segment_size=128)
    # drop the padded value(phone number length is short the 16bytes)
    value = cipher.encrypt(bytes(padded_value, encoding="utf8")[:len(encrypt_str)])
    ciphertext = iv + value
    return str(base64.encodebytes(ciphertext).strip(), encoding="utf8")


def aes_decrypt_seg(encrypt_str: str, cecret: str) -> str:
    data = base64.decodebytes(bytes(encrypt_str, encoding="utf8"))
    cihpertxt = data[AES.block_size:]
    remainder = len(cihpertxt) % AES.block_size
    if remainder:
        padded_value = cihpertxt + b'\0' * (AES.block_size - remainder)
    else:
        padded_value = cihpertxt
    cryptor = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, data[0:AES.block_size], segment_size=128)
    plain_text = cryptor.decrypt(padded_value)
    return str(plain_text[0:len(cihpertxt)], encoding="utf8")


def get_mx_day():
    POST_URL: str = f"{HOST}/v2/offline/api/mx/day/current-download-url"
    headers: dict = {"content-type": "application/json"}
    # 通过snkey加密data
    payload: dict = {
        "current_version": 20220808,
    }
    data: str = aes_encrypt_seg(json.dumps(payload), SNKEY)

    r = requests.post(url=POST_URL, headers=headers, json={"snuser": SNUSER, "data": data})
    rjson: dict = r.json()
    print(rjson)
    if rjson["status"] != 200:
        print(rjson['errmsg'])
    else:
        print(aes_decrypt_seg(rjson["data"], SNKEY))


if __name__ == "__main__":
    get_mx_day()


# 响应体示例
# {
#    'errmsg': 'ok',
#    'status': 200,
#    'data': 'G4EstRHHcsolCtnUvHBwLWDb0Rw94qtctyQDxFJSiSX5hPkisZ4dA+xJ1obyn3Tjzqqz4jy2pssRuVAabqT+mLgaGPtZD5qW2747M551ICvOcPUGcw5+yaeGkY6HS60hBWPf7TgQ9mjE6o/iX0NbuIXnG1tyi8r3BQJoJkTJif6eBsVdpVyaKH0x92MwFcn8DP5lCasQ0RQ376rgxiKSEXLH8e3mNNhFJanrphuGVil7hUhCX/DVitjKdZSyJ/svsxsUG49s8txrX8G0KzEfctXNTQLa7IeNKCORFeh9GFWp7RKJ23IpZo+8V4WMplJ5eh+q7XaQgYe3uAeqC53AfBcwlCiHjBtVM3TqAwM='
# }
# 响应体data示例
# {
#    'current_version': 20220808,
#    'current_download_url': 'http://black-mailbox-package.oss-cn-shenzhen.aliyuncs.com/xxxxxxxxxxxxxxxx'
# }

1.2.4 分钟包下载服务

名称 内容
功能描述 获取邮箱mx分钟更新包
请求地址 https://black-mailbox.yazx.com/v2/offline/api/mx/min/next-download-url (opens new window)
类型 POST
请求格式 JSON

请求体

参数名称 参数类型 是否必传 参数说明
snuser string 用户识别id(配置管理页面中可以查看)
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 是否必传 参数说明
current_version int 当前已更新版本,格式为%Y%m%d%H%M, exm:202208080000

响应体

参数名称 参数类型 参数说明
status int 状态码
errmsg str 对应状态码返回提示信息
data string 由AES的CFB加密的json数据,密钥是SNKEY

data内容如下

参数名称 参数类型 参数说明
current_version int 请求体中的current_version
next_version int 下一个版本
next_download_url str 下一个版本号的分钟包的oss下载链接

代码示例

# -*- encoding: utf-8 -*-

"""
python 版本
# 3.7及以上

# 依赖包
pip install pycryptodome
pip install requests

"""
import base64

import requests
import json
from Crypto.Cipher import AES
from Crypto import Random

SNUSER: str = "test"
SNKEY: str = "test"
HOST: str = "https://black-mailbox.yazx.com"


def aes_encrypt_seg(encrypt_str: str, cecret: str) -> str:
    remainder = len(encrypt_str) % AES.block_size
    if remainder:
        padded_value = encrypt_str + '\0' * (AES.block_size - remainder)
    else:
        padded_value = encrypt_str
    # a random 16 byte key
    iv = Random.new().read(AES.block_size)
    # CFB mode
    cipher = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, iv, segment_size=128)
    # drop the padded value(phone number length is short the 16bytes)
    value = cipher.encrypt(bytes(padded_value, encoding="utf8")[:len(encrypt_str)])
    ciphertext = iv + value
    return str(base64.encodebytes(ciphertext).strip(), encoding="utf8")


def aes_decrypt_seg(encrypt_str: str, cecret: str) -> str:
    data = base64.decodebytes(bytes(encrypt_str, encoding="utf8"))
    cihpertxt = data[AES.block_size:]
    remainder = len(cihpertxt) % AES.block_size
    if remainder:
        padded_value = cihpertxt + b'\0' * (AES.block_size - remainder)
    else:
        padded_value = cihpertxt
    cryptor = AES.new(bytes(cecret, encoding="utf-8"), AES.MODE_CFB, data[0:AES.block_size], segment_size=128)
    plain_text = cryptor.decrypt(padded_value)
    return str(plain_text[0:len(cihpertxt)], encoding="utf8")


def get_mx_min():
    POST_URL: str = f"{HOST}/v2/offline/api/mx/min/next-download-url"
    headers: dict = {"content-type": "application/json"}
    # 通过snkey加密data
    payload: dict = {
        "current_version": 202208080000,
    }
    data: str = aes_encrypt_seg(json.dumps(payload), SNKEY)

    r = requests.post(url=POST_URL, headers=headers, json={"snuser": SNUSER, "data": data})
    rjson: dict = r.json()
    print(rjson)
    if rjson["status"] != 200:
        print(rjson['errmsg'])
    else:
        print(aes_decrypt_seg(rjson["data"], SNKEY))


if __name__ == "__main__":
    get_mx_min()


# 响应体示例
# {
#    'errmsg': 'ok',
#    'status': 200,
#    'data': 'G4EstRHHcsolCtnUvHBwLWDb0Rw94qtctyQDxFJSiSX5hPkisZ4dA+xJ1obyn3Tjzqqz4jy2pssRuVAabqT+mLgaGPtZD5qW2747M551ICvOcPUGcw5+yaeGkY6HS60hBWPf7TgQ9mjE6o/iX0NbuIXnG1tyi8r3BQJoJkTJif6eBsVdpVyaKH0x92MwFcn8DP5lCasQ0RQ376rgxiKSEXLH8e3mNNhFJanrphuGVil7hUhCX/DVitjKdZSyJ/svsxsUG49s8txrX8G0KzEfctXNTQLa7IeNKCORFeh9GFWp7RKJ23IpZo+8V4WMplJ5eh+q7XaQgYe3uAeqC53AfBcwlCiHjBtVM3TqAwM='
# }
# 响应体data示例
# {
#    'current_version': 202208080000,
#    'next_version': 202208080001,
#    'next_download_url': 'http://black-mailbox-package.oss-cn-shenzhen.aliyuncs.com/xxxxxxxxxxxxxxxx'
# }

# 1.5 错误码

错误码表

Code 说明
200 正常状态
430 请求参数错误(请检查请求参数以及aes加密实现)
431 无权限
433 服务器错误
434 当前请求IP不在白名单中 (配置管理地址控制台-配置 (opens new window)
435 服务不存在
436 服务已过期
437 请求的json格式错误

# 二. 使用建议

2.1 命中临时邮箱

由于临时邮箱具有一次性、用完即毁的特点,决定了使用该类邮箱的用户绝非对业务有价值的用户。 同时由于临时邮箱的创建成本极其低廉,黑灰产很容易就能批量创建大量临时邮箱账号从而进行垃圾注册 。

2.2 命中完整邮箱黑名单

该类邮箱是我们明确发现在被黑产持有,而非被自然人所持有的电子邮箱,产生的误判概率极低。

综上所述,对命中临时邮箱或完整邮箱黑名单的用户可以在单一策略下直接拦截。对于无风险邮箱用户可以使用邮箱类型补充用户的画像。

Last Updated: 2/26/2024, 6:43:21 PM