Source code for dedoc.readers.pdf_reader.pdf_txtlayer_reader.pdf_tabby_reader

import os.path
from typing import List, Optional, Tuple, Union

from dedocutils.data_structures import BBox
from numpy import ndarray

from dedoc.common.exceptions.java_not_found_error import JavaNotFoundError
from dedoc.common.exceptions.tabby_pdf_error import TabbyPdfError
from dedoc.data_structures.hierarchy_level import HierarchyLevel
from dedoc.data_structures.line_with_meta import LineWithMeta
from dedoc.data_structures.unstructured_document import UnstructuredDocument
from dedoc.readers.pdf_reader.data_classes.line_with_location import LineWithLocation
from dedoc.readers.pdf_reader.data_classes.pdf_image_attachment import PdfImageAttachment
from dedoc.readers.pdf_reader.data_classes.tables.scantable import ScanTable
from dedoc.readers.pdf_reader.pdf_base_reader import ParametersForParseDoc, PdfBaseReader


[docs]class PdfTabbyReader(PdfBaseReader): """ This class allows to extract content (textual and table) from the .pdf documents with a textual layer (copyable documents). It uses java code to get the result. It is recommended to use this class as a handler for PDF documents with a correct textual layer if you don't need to check textual layer correctness. For more information, look to `pdf_with_text_layer` option description in :ref:`pdf_handling_parameters`. """ def __init__(self, *, config: Optional[dict] = None) -> None: import os from dedoc.extensions import recognized_extensions, recognized_mimes from dedoc.readers.pdf_reader.pdf_image_reader.table_recognizer.table_extractors.concrete_extractors.onepage_table_extractor import \ OnePageTableExtractor from dedoc.readers.pdf_reader.pdf_image_reader.table_recognizer.table_extractors.concrete_extractors.table_attribute_extractor import \ TableHeaderExtractor super().__init__(config=config, recognized_extensions=recognized_extensions.pdf_like_format, recognized_mimes=recognized_mimes.pdf_like_format) self.tabby_java_version = "2.0.0" self.jar_name = "ispras_tbl_extr.jar" self.jar_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "tabbypdf", "jars")) self.java_not_found_error = "`java` command is not found from this Python process. Please ensure Java is installed and PATH is set for `java`" self.default_config = {"JAR_PATH": os.path.join(self.jar_dir, self.jar_name)} self.table_header_selector = TableHeaderExtractor(logger=self.logger) self.table_extractor = OnePageTableExtractor(config=config, logger=self.logger) self.debug_save_path = os.path.join(self.config.get("path_debug", os.path.join(os.path.abspath(os.sep), "tmp", "dedoc")), "tabby")
[docs] def can_read(self, file_path: Optional[str] = None, mime: Optional[str] = None, extension: Optional[str] = None, parameters: Optional[dict] = None) -> bool: """ Check if the document extension is suitable for this reader (PDF format is supported only). This method returns `True` only when the key `pdf_with_text_layer` with value `tabby` is set in the dictionary `parameters`. You can look to :ref:`pdf_handling_parameters` to get more information about `parameters` dictionary possible arguments. Look to the documentation of :meth:`~dedoc.readers.BaseReader.can_read` to get information about the method's parameters. """ from dedoc.utils.parameter_utils import get_param_pdf_with_txt_layer return super().can_read(file_path=file_path, mime=mime, extension=extension) and get_param_pdf_with_txt_layer(parameters) == "tabby"
[docs] def read(self, file_path: str, parameters: Optional[dict] = None) -> UnstructuredDocument: """ The method return document content with all document's lines, tables and attachments. This reader is able to add some additional information to the `tag_hierarchy_level` of :class:`~dedoc.data_structures.LineMetadata`. Look to the documentation of :meth:`~dedoc.readers.BaseReader.read` to get information about the method's parameters. You can also see :ref:`pdf_handling_parameters` to get more information about `parameters` dictionary possible arguments. """ import tempfile from dedoc.utils.parameter_utils import get_param_with_attachments parameters = {} if parameters is None else parameters warnings = [] with tempfile.TemporaryDirectory() as tmp_dir: lines, tables, attachments, document_metadata = self.__extract(path=file_path, parameters=parameters, warnings=warnings, tmp_dir=tmp_dir) if get_param_with_attachments(parameters) and self.attachment_extractor.can_extract(file_path): attachments += self.attachment_extractor.extract(file_path=file_path, parameters=parameters) lines = [line for line_group in lines for line in line_group.split("\n")] lines = self.paragraph_extractor.extract(lines) result = UnstructuredDocument(lines=lines, tables=tables, attachments=attachments, warnings=warnings, metadata=document_metadata) return self._postprocess(result)
def __extract(self, path: str, parameters: dict, warnings: List[str], tmp_dir: str)\ -> Tuple[List[LineWithLocation], List[ScanTable], List[PdfImageAttachment], Optional[dict]]: import math from dedoc.utils.pdf_utils import get_pdf_page_count from dedoc.utils.utils import calculate_file_hash, flatten from dedoc.utils.parameter_utils import get_param_page_slice, get_param_with_attachments from dedoc.utils.parameter_utils import get_param_need_gost_frame_analysis from dedoc.utils.parameter_utils import get_param_need_header_footers_analysis all_lines, all_tables, all_scan_tables, all_attached_images = [], [], [], [] with_attachments = get_param_with_attachments(parameters) document_metadata = None file_hash = calculate_file_hash(path=path) page_count = get_pdf_page_count(path) page_count = math.inf if page_count is None else page_count first_page, last_page = get_param_page_slice(parameters) empty_page_limit = (first_page is not None and first_page >= page_count) or (last_page is not None and first_page >= last_page) partial_page_limit = (first_page is not None and first_page > 0) or (last_page is not None and last_page < page_count) if empty_page_limit or partial_page_limit: warnings.append("The document is partially parsed") document_metadata = dict(first_page=first_page) if last_page is not None: document_metadata["last_page"] = last_page if empty_page_limit: return all_lines, all_tables, all_attached_images, document_metadata remove_gost_frame = get_param_need_gost_frame_analysis(parameters) gost_json_path = self.__save_gost_frame_boxes_to_json(first_page=first_page, last_page=last_page, page_count=page_count, tmp_dir=tmp_dir, path=path) \ if remove_gost_frame else "" # in java tabby reader page numeration starts with 1, end_page is included first_tabby_page = first_page + 1 if first_page is not None else 1 last_tabby_page = page_count if (last_page is None) or (last_page is not None and last_page > page_count) else last_page self.logger.info(f"Reading PDF pages from {first_tabby_page} to {last_tabby_page}") document = self.__process_pdf(path=path, start_page=first_tabby_page, end_page=last_tabby_page, tmp_dir=tmp_dir, gost_json_path=gost_json_path, remove_frame=remove_gost_frame) pages = document.get("pages", []) lines = [] for page in pages: page_lines = self.__get_lines_with_location(page, file_hash) if page_lines: lines.append(page_lines) scan_tables = self.__get_tables(page) all_scan_tables.extend(scan_tables) attached_images = self.__get_attached_images(page=page, parameters=parameters, path=path) if with_attachments else [] if attached_images: all_attached_images.extend(attached_images) if get_param_need_header_footers_analysis(parameters): lines, headers, footers = self.header_footer_detector.detect(lines) all_lines = list(flatten(lines)) mp_tables = self.table_recognizer.convert_to_multipages_tables(all_scan_tables, lines_with_meta=all_lines) all_lines = self.linker.link_objects(lines=all_lines, tables=mp_tables, images=all_attached_images) if self.config.get("debug_mode", False): self.__save_objects_bboxes(all_lines + mp_tables + all_attached_images, path) return all_lines, mp_tables, all_attached_images, document_metadata def __save_gost_frame_boxes_to_json(self, first_page: Optional[int], last_page: Optional[int], page_count: int, path: str, tmp_dir: str) -> str: from joblib import Parallel, delayed import json first_page = 0 if first_page is None or first_page < 0 else first_page last_page = page_count if (last_page is None) or (last_page is not None and last_page > page_count) else last_page images = self._get_images(path, first_page, last_page) gost_analyzed_images = Parallel(n_jobs=self.config["n_jobs"])(delayed(self.gost_frame_recognizer.rec_and_clean_frame)(image) for image in images) result_dict = { page_number: {**page_data[1].to_dict(), **{"original_image_width": page_data[2][1], "original_image_height": page_data[2][0]}} for page_number, page_data in enumerate(gost_analyzed_images, start=first_page) } result_json_path = os.path.join(tmp_dir, "gost_frame_bboxes.json") with open(result_json_path, "w") as f: json.dump(result_dict, f) return result_json_path def __get_tables(self, page: dict) -> List[ScanTable]: from dedoc.readers.pdf_reader.data_classes.tables.cell import Cell from dedoc.data_structures.concrete_annotations.bbox_annotation import BBoxAnnotation from dedoc.data_structures.line_metadata import LineMetadata scan_tables = [] page_number = page["number"] page_width = int(page["width"]) page_height = int(page["height"]) for table in page["tables"]: table_bbox = BBox(x_top_left=table["x_top_left"], y_top_left=table["y_top_left"], width=table["width"], height=table["height"]) order = table["order"] rows = table["rows"] cell_properties = table["cell_properties"] assert len(rows) == len(cell_properties) cells = [] for num_row, row in enumerate(rows): assert len(row) == len(cell_properties[num_row]) result_row = [] for num_col, cell in enumerate(row): annotations = [] cell_blocks = cell["cell_blocks"] for c in cell_blocks: cell_bbox = BBox(x_top_left=int(c["x_top_left"]), y_top_left=int(c["y_top_left"]), width=int(c["width"]), height=int(c["height"])) annotations.append(BBoxAnnotation(c["start"], c["end"], cell_bbox, page_width=page_width, page_height=page_height)) current_cell_properties = cell_properties[num_row][num_col] bbox = BBox(x_top_left=int(current_cell_properties["x_top_left"]), y_top_left=int(current_cell_properties["y_top_left"]), width=int(current_cell_properties["width"]), height=int(current_cell_properties["height"])) result_row.append(Cell( bbox=bbox, lines=[LineWithMeta(line=cell["text"], metadata=LineMetadata(page_id=page_number, line_id=0), annotations=annotations)], colspan=current_cell_properties["col_span"], rowspan=current_cell_properties["row_span"], invisible=bool(current_cell_properties["invisible"]) )) cells.append(result_row) try: cells = self.table_extractor.handle_cells(cells) scan_tables.append(ScanTable(page_number=page_number, cells=cells, bbox=table_bbox, order=order, page_width=page_width, page_height=page_height)) except Exception as ex: self.logger.warning(f"Warning: unrecognized table on page {page_number}. {ex}") if self.config.get("debug_mode", False): raise ex return scan_tables def __get_attached_images(self, page: dict, parameters: dict, path: str) -> List[PdfImageAttachment]: import os import shutil import uuid from dedoc.readers.pdf_reader.data_classes.tables.location import Location from dedoc.utils.utils import get_unique_name from dedoc.utils.parameter_utils import get_param_attachments_dir, get_param_need_content_analysis attachments_dir = get_param_attachments_dir(parameters, path) need_content_analysis = get_param_need_content_analysis(parameters) image_attachment_list = [] for image_dict in page["images"]: image_location = Location( page_number=page["number"], bbox=BBox(x_top_left=image_dict["x_top_left"], y_top_left=image_dict["y_top_left"], width=image_dict["width"], height=image_dict["height"]) ) tmp_file_name = get_unique_name(image_dict["original_name"]) tmp_file_path = os.path.join(attachments_dir, tmp_file_name) shutil.move(image_dict["tmp_file_path"], tmp_file_path) image_attachment = PdfImageAttachment( original_name=image_dict["original_name"], tmp_file_path=tmp_file_path, need_content_analysis=need_content_analysis, uid=f"attach_{uuid.uuid4()}", location=image_location ) image_attachment_list.append(image_attachment) return image_attachment_list def __get_lines_with_location(self, page: dict, file_hash: str) -> List[LineWithLocation]: from dedoc.data_structures.concrete_annotations.bbox_annotation import BBoxAnnotation from dedoc.data_structures.concrete_annotations.bold_annotation import BoldAnnotation from dedoc.data_structures.concrete_annotations.indentation_annotation import IndentationAnnotation from dedoc.data_structures.concrete_annotations.italic_annotation import ItalicAnnotation from dedoc.data_structures.concrete_annotations.linked_text_annotation import LinkedTextAnnotation from dedoc.data_structures.concrete_annotations.size_annotation import SizeAnnotation from dedoc.data_structures.concrete_annotations.spacing_annotation import SpacingAnnotation from dedoc.data_structures.concrete_annotations.style_annotation import StyleAnnotation from dedoc.data_structures.line_metadata import LineMetadata from dedoc.readers.pdf_reader.data_classes.tables.location import Location lines = [] page_number, page_width, page_height = page["number"], int(page["width"]), int(page["height"]) labeling_mode = self.config.get("labeling_mode", False) for block in page["blocks"]: annotations = [] order = block["order"] block_text = block["text"] len_block = len(block_text) annotations.append(IndentationAnnotation(0, len_block, str(block["indent"]))) annotations.append(SpacingAnnotation(0, len_block, str(block["spacing"]))) for annotation in block["annotations"]: start = annotation["start"] end = annotation["end"] if not labeling_mode: box = BBox(x_top_left=int(annotation["x_top_left"]), y_top_left=int(annotation["y_top_left"]), width=int(annotation["width"]), height=int(annotation["height"])) annotations.append(BBoxAnnotation(start, end, box, page_width=page_width, page_height=page_height)) annotations.append(SizeAnnotation(start, end, str(annotation["font_size"]))) annotations.append(StyleAnnotation(start, end, annotation["font_name"])) if annotation["is_bold"]: annotations.append(BoldAnnotation(start, end, "True")) if annotation["is_italic"]: annotations.append(ItalicAnnotation(start, end, "True")) if annotation["metadata"] == "LINK": annotations.append(LinkedTextAnnotation(start, end, annotation["url"])) bbox = BBox(x_top_left=int(block["x_top_left"]), y_top_left=int(block["y_top_left"]), width=int(block["width"]), height=int(block["height"])) if labeling_mode: annotations.append(BBoxAnnotation(0, len_block, bbox, page_width=page_width, page_height=page_height)) meta = block["metadata"].lower() uid = f"txt_{file_hash}_{order}" metadata = LineMetadata(page_id=page_number, line_id=order) line_with_location = LineWithLocation(line=block_text, metadata=metadata, annotations=annotations, uid=uid, location=Location(bbox=bbox, page_number=page_number), order=order) line_with_location.metadata.tag_hierarchy_level = self.__get_tag(line_with_location, meta) lines.append(line_with_location) return lines def __get_tag(self, line: LineWithMeta, line_type: str) -> HierarchyLevel: from dedoc.structure_extractors.feature_extractors.list_features.list_utils import get_dotted_item_depth if line_type == HierarchyLevel.header: header_level = get_dotted_item_depth(line.line) header_level = header_level if header_level != -1 else 1 return HierarchyLevel(1, header_level, False, line_type) if line_type == "litem": # TODO automatic list depth and merge list items from multiple lines return HierarchyLevel(None, None, False, HierarchyLevel.list_item) return HierarchyLevel.create_unknown() def __jar_path(self) -> str: import os return os.environ.get("TABBY_JAR", self.default_config["JAR_PATH"]) def __run(self, path: str, tmp_dir: str, encoding: str = "utf-8", start_page: int = None, end_page: int = None, remove_frame: bool = False, gost_json_path: str = "" ) -> bytes: import subprocess args = ["java"] + ["-jar", self.__jar_path(), "-i", path, "-tmp", f"{tmp_dir}/"] if remove_frame: args += ["-rf", gost_json_path] if start_page is not None and end_page is not None: args += ["-sp", str(start_page), "-ep", str(end_page)] try: result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL, check=True) if result.stderr: self.logger.warning(f"Got stderr: {result.stderr.decode(encoding)}") return result.stdout except FileNotFoundError: raise JavaNotFoundError(self.java_not_found_error) except subprocess.CalledProcessError as e: raise TabbyPdfError(e.stderr.decode(encoding)) def __process_pdf(self, path: str, tmp_dir: str, start_page: int = None, end_page: int = None, gost_json_path: str = "", remove_frame: bool = False) -> dict: import json import os self.__run(path=path, start_page=start_page, end_page=end_page, tmp_dir=tmp_dir, remove_frame=remove_frame, gost_json_path=gost_json_path) with open(os.path.join(tmp_dir, "data.json"), "r") as response: document = json.load(response) return document def _process_one_page(self, image: ndarray, parameters: ParametersForParseDoc, page_number: int, path: str) -> Tuple[List[LineWithLocation], List[ScanTable], List[PdfImageAttachment], List[float]]: return [], [], [], [] def __save_objects_bboxes(self, all_objects: List[Union[LineWithLocation, ScanTable, PdfImageAttachment]], path: str) -> None: if not all_objects: return import json import os from dedoc.data_structures.concrete_annotations.bbox_annotation import BBoxAnnotation os.makedirs(self.debug_save_path, exist_ok=True) dsize = None lines = [line for line in all_objects if isinstance(line, LineWithLocation)] if lines: annotations = [ann for ann in lines[0].annotations if isinstance(ann, BBoxAnnotation)] if annotations: bbox = json.loads(annotations[0].value) dsize = (bbox["page_width"], bbox["page_height"]) import cv2 import numpy as np from dedoc.utils.pdf_utils import get_page_image current_page_id = None current_image = None all_objects.sort(key=lambda o: (o.location.page_number, o.order, o.location)) for current_object in all_objects: if current_object.location.page_number != current_page_id: if current_image is not None: cv2.imwrite(os.path.join(self.debug_save_path, f"{current_page_id}.png"), current_image) current_page_id = current_object.location.page_number current_image = np.asarray(get_page_image(path, current_page_id)) if dsize: current_image = cv2.resize(current_image, dsize=dsize, interpolation=cv2.INTER_CUBIC) p1 = (current_object.location.bbox.x_top_left, current_object.location.bbox.y_top_left) p2 = (current_object.location.bbox.x_bottom_right, current_object.location.bbox.y_bottom_right) if isinstance(current_object, LineWithLocation): color = (0, 255, 0) elif isinstance(current_object, ScanTable): color = (255, 0, 0) else: color = (0, 0, 255) cv2.rectangle(current_image, p1, p2, color) if current_image is not None: cv2.imwrite(os.path.join(self.debug_save_path, f"{current_page_id}.png"), current_image)