from email.message import Message
from typing import List, Optional
from dedoc.data_structures.line_metadata import LineMetadata
from dedoc.data_structures.line_with_meta import LineWithMeta
from dedoc.data_structures.unstructured_document import UnstructuredDocument
from dedoc.readers.base_reader import BaseReader
[docs]class EmailReader(BaseReader):
"""
This class is used for parsing documents with .eml extension (e-mail messages saved into files).
"""
def __init__(self, *, config: Optional[dict] = None) -> None:
from dedoc.extensions import recognized_extensions, recognized_mimes
from dedoc.readers.html_reader.html_reader import HtmlReader
super().__init__(config=config, recognized_extensions=recognized_extensions.eml_like_format, recognized_mimes=recognized_mimes.eml_like_format)
self.html_reader = HtmlReader(config=self.config)
[docs] def can_read(self, file_path: Optional[str] = None, mime: Optional[str] = None, extension: Optional[str] = None, parameters: Optional[dict] = None) -> bool:
"""
Check if the document extension or mime is suitable for this reader.
Look to the documentation of :meth:`~dedoc.readers.BaseReader.can_read` to get information about the method's parameters.
"""
from dedoc.utils.utils import get_mime_extension
mime, extension = get_mime_extension(file_path=file_path, mime=mime, extension=extension)
# this code differs from BaseReader because .eml and .mhtml files have the same mime type
if extension:
return extension.lower() in self._recognized_extensions
return mime in self._recognized_mimes
[docs] def read(self, file_path: str, parameters: Optional[dict] = None) -> UnstructuredDocument:
"""
The method return document content with all document's lines, tables and attachments.
This reader is able to add some additional information to the `tag_hierarchy_level` of :class:`~dedoc.data_structures.LineMetadata`.
It also saves some data from the message's header (fields "subject", "from", "to", "cc", "bcc", "date", "reply-to")
to the attached json file with prefix `message_header_`.
Look to the documentation of :meth:`~dedoc.readers.BaseReader.read` to get information about the method's parameters.
"""
import email
import json
import os
import uuid
from dedoc.data_structures.attached_file import AttachedFile
from dedoc.utils.parameter_utils import get_param_attachments_dir, get_param_need_content_analysis, get_param_with_attachments
from dedoc.utils.utils import get_unique_name
parameters = {} if parameters is None else parameters
attachments_dir = get_param_attachments_dir(parameters, file_path)
with_attachments = get_param_with_attachments(parameters)
need_content_analysis = get_param_need_content_analysis(parameters)
with open(file_path, "rb") as f:
msg = email.message_from_binary_file(f)
tables, attachments = [], []
all_header_fields = dict(msg.items())
lines = self.__get_main_fields(msg)
header_filename = "message_header_" + get_unique_name("message_header.json")
if with_attachments:
# saving message header into separated file as an attachment
header_file_path = os.path.join(attachments_dir, header_filename)
with open(header_file_path, "w", encoding="utf-8") as f:
json.dump(all_header_fields, f, ensure_ascii=False, indent=4)
attachments.append(AttachedFile(original_name=header_filename,
tmp_file_path=header_file_path,
uid=f"attach_{uuid.uuid1()}",
need_content_analysis=need_content_analysis))
html_found = False
text_parts = []
content_type = msg.get_content_type()
if content_type == "text/plain":
text_parts.append(msg)
if content_type == "text/html":
self.__add_content_from_html(msg, lines, tables, parameters)
html_found = True
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
text_parts.append(part)
continue
if content_type == "text/html":
self.__add_content_from_html(part, lines, tables, parameters)
html_found = True
continue
if part.is_multipart():
continue
if with_attachments:
self.__add_attachment(part, attachments_dir, attachments, need_content_analysis)
# text/plain has the same content as text/html
if not html_found:
for text_part in text_parts:
try:
self.__add_text_content(text_part, lines)
except Exception as e:
self.logger.info(f"Error while text reading: {e}")
return UnstructuredDocument(lines=lines, tables=tables, attachments=attachments)
def __add_attachment(self, message: Message, attachments_dir: str, attachments: list, need_content_analysis: bool) -> None:
import mimetypes
import os
import uuid
from dedoc.data_structures.attached_file import AttachedFile
from dedoc.utils.utils import save_data_to_unique_file
content_type = message.get_content_type()
payload = message.get_payload(decode=True)
if payload is None or content_type == "text/plain" or content_type == "text/html":
return
filename = message.get_filename()
filename = "" if filename is None else self.__get_decoded(filename)
filename, extension = os.path.splitext(filename)
filename = self.__fix_filename(filename)
filename = str(uuid.uuid4()) if filename == "" else filename
fixed_extension = self.__fix_filename(extension)
if extension == "" or fixed_extension != extension:
extension = mimetypes.guess_extension(content_type)
extension = ".txt" if extension == ".bat" else extension
filename = f"{filename}{extension}"
tmp_file_name = save_data_to_unique_file(directory=attachments_dir, filename=filename, binary_data=payload)
attachments.append(AttachedFile(original_name=filename,
tmp_file_path=os.path.join(attachments_dir, tmp_file_name),
uid=f"attach_{uuid.uuid1()}",
need_content_analysis=need_content_analysis))
def __add_content_from_html(self, message: Message, lines: list, tables: list, parameters: dict) -> None:
from tempfile import NamedTemporaryFile
payload = message.get_payload(decode=True)
if payload is None:
return
if "\\u" in payload.decode():
payload = message.get_payload()
file = NamedTemporaryFile(mode="w")
else:
file = NamedTemporaryFile(mode="wb")
file.write(payload)
file.flush()
document = self.html_reader.read(file_path=file.name, parameters=parameters)
part_messages = [line for line in document.lines if line.line is not None]
for line in part_messages:
line._line += "\n"
lines.extend(part_messages)
tables.extend(document.tables)
file.close()
def __add_text_content(self, message: Message, lines: list) -> None:
from dedoc.data_structures.hierarchy_level import HierarchyLevel
payload = message.get_payload(decode=True)
if payload is None:
return
payload = payload.decode()
if "\\u" in payload:
# in this case the message wasn't encoded
payload = message.get_payload()
list_of_texts = payload.split("\n")
for text in list_of_texts:
text += "\n"
lines.append(LineWithMeta(line=text,
metadata=LineMetadata(tag_hierarchy_level=HierarchyLevel.create_unknown(), page_id=0, line_id=0),
annotations=[]))
def __fix_filename(self, filename: str) -> str:
import re
filename = re.sub(r"[<>:\"/\\|?*]", "_", filename)
filename = re.sub(r"\s+", " ", filename)
return filename
def __get_decoded(self, text: str) -> str:
from email.header import decode_header
part = []
for letter, encode in decode_header(text):
if isinstance(letter, bytes):
if encode is None:
encode = "ascii"
letter = letter.decode(encoding=encode)
part.append(letter)
part = "".join(part)
return part
def __get_field(self, message: Message, key: str, line_metadata: LineMetadata) -> LineWithMeta:
text = self.__get_decoded(message.get(key.lower(), ""))
return LineWithMeta(line=text, metadata=line_metadata)
def __get_main_fields(self, message: Message) -> List[LineWithMeta]:
from dedoc.data_structures.hierarchy_level import HierarchyLevel
lines = list()
line_metadata = LineMetadata(tag_hierarchy_level=HierarchyLevel(0, 0, False, "root"), page_id=0, line_id=0)
lines.append(self.__get_field(message, "subject", line_metadata))
required_fields = ["subject", "from", "to", "cc", "bcc", "date", "reply-to"]
for field_name in required_fields:
line_metadata = LineMetadata(tag_hierarchy_level=HierarchyLevel(1, 0, False, field_name), page_id=0, line_id=0)
line = self.__get_field(message, field_name, line_metadata=line_metadata)
if len(line.line) > 0:
lines.append(line)
return lines