Coverage for src/ragindexer/DocumentIndexer.py: 56%
116 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-20 15:57 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-20 15:57 +0000
1import os
2import time
3import threading
4from pathlib import Path
5from typing import Iterable, List, Tuple
7from watchdog.observers import Observer
8from watchdog.events import FileSystemEventHandler, FileSystemEvent
10from sentence_transformers import SentenceTransformer
11from qdrant_client.models import (
12 Filter,
13 FieldCondition,
14 MatchValue,
15)
17from .documents.DocumentFactory import DocumentFactory
18from . import logger
19from .index_database import (
20 delete_stored_file,
21 get_stored_timestamp,
22 set_stored_timestamp,
23 list_stored_files,
24)
25from .config import config
26from .QdrantIndexer import QdrantIndexer
27from .models import ChunkType, EmbeddingType
30class DocumentIndexer:
31 """
32 Object that reacts to filesystem events (document creation/modification/deletion)
33 and updates the databases
35 """
37 def __init__(self):
38 # Load embedding model
39 self.model = SentenceTransformer(
40 config.EMBEDDING_MODEL,
41 trust_remote_code=config.EMBEDDING_MODEL_TRUST_REMOTE_CODE,
42 backend="torch",
43 cache_folder=config.STATE_DB_PATH.parent / "models",
44 )
45 self.vector_size = self.model.get_sentence_embedding_dimension()
46 self.doc_factory = DocumentFactory()
47 self.doc_factory.set_embedding_model(self.model)
49 # Initialize Qdrant
50 self.qdrant = QdrantIndexer(vector_size=self.vector_size)
52 # Lock around state & indexing operations
53 self.lock = threading.Lock()
55 def extract_text(
56 self, abspath: Path
57 ) -> Iterable[Tuple[int, List[ChunkType], List[EmbeddingType], dict]]:
58 """Extract chunks, embeddings and metadata from file path
60 Args:
61 abspath: Path to a file to analyse
63 Yields:
64 A tuple with a list of chunks, the corresponding list of embeddings, and the file metadata
66 """
67 for k_page, chunks, embeddings, file_metadata in self.doc_factory.processDocument(abspath):
68 yield k_page, chunks, embeddings, file_metadata
70 def process_file(self, filepath: Path, force: bool = False):
71 """
72 Extract text, chunk, embed, and upsert into Qdrant.
74 Args:
75 filepath: Path to the file to be analyzed
76 force: True to process the file even if the database says that it has already been processed
78 """
79 stat = os.path.getmtime(filepath)
80 stored = get_stored_timestamp(filepath)
81 if (stored is not None and stored == stat) and not force: 81 ↛ 83line 81 didn't jump to line 83 because the condition on line 81 was never true
82 # No change
83 return
85 logger.info(72 * "=")
86 logger.info(f"[INDEX] Processing changed file: '{filepath}'")
87 nb_emb = 0
88 for k_page, chunks, embeddings, file_metadata in self.extract_text(filepath):
89 # Upsert into Qdrant
90 self.qdrant.record_embeddings(k_page, chunks, embeddings, file_metadata)
91 nb_emb += len(embeddings)
93 # Update state DB
94 set_stored_timestamp(filepath, stat)
95 logger.info(f"[INDEX] Upserted {nb_emb} vectors")
97 def remove_file(self, filepath: Path):
98 """
99 Delete all vectors whose payload.source == this file's absolute path.
100 We identify by regenerating all chunk IDs for old state—but since we store
101 last-modified in SQLite, we know it existed before; we'll iterate over state DB
102 to remove associated chunk IDs. Simpler: query by payload.source in Qdrant.
104 Args:
105 filepath: Path to the file to be analyzed, relative to DOCS_PATH
107 """
108 logger.info(f"[DELETE] Removing file from index: '{filepath}'")
110 # Query Qdrant for all points with payload.source == abspath
111 # filter_ = {"must": [{"key": "source", "match": {"value": abspath}}]}
112 filter_ = Filter(must=[FieldCondition(key="source", match=MatchValue(value=str(filepath)))])
114 # Retrieve IDs matching that filter
115 hits = self.qdrant.search(limit=1000, query_filter=filter_)
117 ids_to_delete = [hit.id for hit in hits]
118 if ids_to_delete:
119 self.qdrant.delete(ids_to_delete)
120 logger.info(f"[DELETE] Removed {len(ids_to_delete)} vectors")
122 # Remove from state DB
123 delete_stored_file(filepath)
125 def initial_scan(self) -> int:
126 """
127 On startup, walk entire DOCS_PATH and index any new/modified files.
128 Also, find any entries in state DB that no longer exist on disk, and remove them.
129 """
130 logger.info("Performing initial scan of documents folder...")
132 # 1. Build a set of all file paths on disk
133 disk_files: list[Path] = []
134 for ext in ("*.pdf", "*.docx", "*.xlsx", "*.xlsm", "*.md", "*.txt"):
135 disk_files.extend(config.DOCS_PATH.rglob(ext))
136 for ext in ("*.pdf", "*.docx", "*.xlsx", "*.xlsm", "*.md", "*.txt"):
137 disk_files.extend(config.EMAILS_PATH.rglob(ext))
138 disk_files = [p.resolve() for p in disk_files]
140 # 2. For each file on disk, check timestamp vs. state DB
141 files_to_index = []
142 for file_path in disk_files:
143 stored = get_stored_timestamp(file_path)
144 modified = os.path.getmtime(str(file_path))
145 if stored is None or stored != modified: 145 ↛ 142line 145 didn't jump to line 142 because the condition on line 145 was always true
146 files_to_index.append(file_path)
148 # 3. For each modified file on disk, process its chunks
149 tot_nb_files = len(files_to_index)
150 for n_file, file_path in enumerate(files_to_index):
151 logger.info(f"Initial indexation of {n_file}/{tot_nb_files} - '{file_path}'")
152 stored = get_stored_timestamp(file_path)
153 modified = os.path.getmtime(str(file_path))
154 self.process_file(file_path)
156 # 3. For each file in state DB, if not on disk anymore, delete from Qdrant
157 for relpath in list_stored_files():
158 abspath = config.DOCS_PATH / relpath
159 if not abspath.exists(): 159 ↛ 161line 159 didn't jump to line 161 because the condition on line 159 was never true
160 # Remove from Qdrant
161 self.remove_file(relpath)
163 return tot_nb_files
165 def __on_created_or_modified(self, event: FileSystemEvent):
166 if event.is_directory:
167 return
169 filepath = Path(event.src_path)
170 if not self.doc_factory.filter_file(filepath):
171 return
173 with self.lock:
174 # Small delay to allow file write to finish
175 time.sleep(0.5)
176 self.process_file(filepath)
178 def __on_deleted(self, event: FileSystemEvent):
179 if event.is_directory:
180 return
182 filepath = Path(event.src_path)
183 if not self.doc_factory.filter_file(filepath):
184 return
186 with self.lock:
187 self.remove_file(filepath)
189 def __on_moved(self, event: FileSystemEvent):
190 # TODO Implement folder and file renaming
191 if event.is_directory:
192 return
194 srcpath = Path(event.src_path)
195 destpath = Path(event.dest_path)
196 if srcpath.suffix in (".pdf", ".docx", ".xlsx", ".xlsm", ".md", ".txt"):
197 with self.lock:
198 time.sleep(0.5)
199 self.remove_file(srcpath)
200 self.process_file(destpath)
202 def start_watcher(self):
203 """
204 Launch the filesystem monitoring as a non blocking thread
206 """
207 event_handler = FileSystemEventHandler()
208 event_handler.on_created = self.__on_created_or_modified
209 event_handler.on_modified = self.__on_created_or_modified
210 event_handler.on_moved = self.__on_moved
211 event_handler.on_deleted = self.__on_deleted
213 # Files observer
214 self.__docs_observer = Observer()
215 self.__docs_observer.schedule(event_handler, path=str(config.DOCS_PATH), recursive=True)
216 self.__docs_observer.start()
218 logger.info(f"Started file watcher on: '{config.DOCS_PATH}'")
220 # Emails observer
221 self.__emails_observer = Observer()
222 self.__emails_observer.schedule(event_handler, path=str(config.EMAILS_PATH), recursive=True)
223 self.__emails_observer.start()
225 logger.info(f"Started emails watcher on: '{config.EMAILS_PATH}'")
227 self.__docs_observer.join()
228 self.__emails_observer.join()