Source code for BRAD.scraper

"""
Literature Repositories
------------------------

This module provides functionality for performing web scraping on various literature archives, including 
`arXiv <https://arxiv.org>`_, `bioRxiv <https://www.biorxiv.org>`_, and `PubMed <https://pubmed.ncbi.nlm.nih.gov>`_. 
The system scrapes these databases to find relevant literature, which can then be downloaded and included in the RAG 
(Retrieval-Augmented Generation) database.


Main Methods
~~~~~~~~~~~~

1. **webScraping**:  
   Selects the correct literature repository based on user input and directs the scraping process to the appropriate database.

2. **arxiv**:  
   Scrapes literature from `arXiv <https://arxiv.org>`_, a preprint server for research papers in fields such as physics, mathematics, 
   computer science, and biology.

3. **biorxiv**:  
   Scrapes literature from `bioRxiv <https://www.biorxiv.org>`_, a preprint repository focused on biology and life sciences.

4. **pubmed**:  
   Scrapes literature from `PubMed <https://pubmed.ncbi.nlm.nih.gov>`_, a database of biomedical and life sciences journal articles 
   maintained by the National Library of Medicine (NLM).


Available Methods
~~~~~~~~~~~~~~~~~

This module has the following methods:

"""


import subprocess

from IPython.display import display

from langchain.document_loaders import DirectoryLoader
from langchain.document_loaders import UnstructuredPDFLoader
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
import chromadb
import pandas as pd
from requests_html import HTMLSession
import requests
from requests.exceptions import ConnectionError
import os
from Bio import Entrez
import math
import pandas as pd
import datetime
import time
import sys
import string
import gc
from bs4 import BeautifulSoup as bs
from urllib.parse import urljoin
from requests_html import HTMLSession
import requests
from requests.exceptions import ConnectionError
from langchain import PromptTemplate, LLMChain
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain

from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
import chromadb
import subprocess

from langchain_community.document_loaders import PyPDFLoader
from langchain.document_loaders import DirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

from BRAD.promptTemplates import scrapeTemplate
from BRAD import utils
from BRAD import log
from BRAD import justchat

[docs] def webScraping(state): """ Performs web scraping based on the provided chat status, executing specific scraping functions for different sources like arXiv, bioRxiv, and PubMed. :param state: The status of the chat, containing information about the current prompt and configuration. :type state: dict :return: The updated chat status after executing the web scraping process. :rtype: dict """ # Auth: Joshua Pickard # jpic@umich.edu # Date: May 20, 2024 if state['continue-module'] is None: # Search in the database state = webScrapingStageOne(state) else: # Download from the database state = webScrapingStageTwo(state) state['continue-module'] = None return state
[docs] def webScrapingStageOne(state): """ This method performs the first round of search involved in the scraping. """ query = state['prompt'] llm = state['llm'] # get the llm memory = state['memory'] # get the memory of the model # Define the mapping of keywords to functions scraping_functions = { 'ARXIV' : arxivStageOne, 'BIORXIV' : biorxiv, 'PUBMED' : pubmed } # Identify the database and the search terms template = scrapeTemplate() template = template.format(search_terms=state['search']['used terms']) PROMPT = PromptTemplate(input_variables=["history", "input"], template=template) conversation = ConversationChain(prompt = PROMPT, llm = llm, verbose = state['config']['debug'], memory = memory, ) response = conversation.predict(input=query) llmResponse = parse_llm_response(response) log.debugLog(llmResponse, state=state) try: llmType = str(llm.model) except: try: llmType = str(llm.model_name) except: llmType = str(llm) state['process']['steps'].append( log.llmCallLog( llm = llmType, prompt = PROMPT, memory = memory, input = query, output = response, parsedOutput = llmResponse, purpose = 'identify how to web scrape' ) ) llmKey, searchTerms = llmResponse['database'].upper(), llmResponse['search_terms'] # Determine the target source source = next((key for key in scraping_functions if key == llmKey), 'PUBMED') # TODO: remove this hardcoded value for debugging source = 'ARXIV' process = {'searched': source} scrape_function = scraping_functions[source] # Execute the scraping function and handle errors try: output = f'searching on {source}...' log.debugLog(output, state=state) log.debugLog('Search Terms: ' + str(searchTerms), state=state) for numTerm, st in enumerate(searchTerms): if numTerm == state['config']['SCRAPE']['max_search_terms']: break # TODO will need to save these results state = scrape_function(st, state) # print(f"{state=}") except Exception as e: output = f'Error occurred while searching on {source}: {e}' log.debugLog(output, state=state) process = {'searched': 'ERROR'} state['process']['steps'].append(process) return state
[docs] def webScrapingStageTwo(state): query = state['prompt'] # print(f"webScrapingStageTwo") if query.upper() != 'Y': # TODO: this should route directly to the general chat # print('Nothing to download now :)') state = justchat.llm_only(state) return state toolHistory = state['continue-module'][1] # print(f"{toolHistory=}") scraping_functions = { 'ARXIV' : arxivStageTwo, 'BIORXIV' : biorxiv, 'PUBMED' : pubmed } source = next((key for key in scraping_functions if key == toolHistory['database']), 'ARXIV') # print(f"{source=}") scrape_function = scraping_functions[source] # print(f"{scrape_function=}") state = scrape_function(query, state) # print(f"{state=}") # if state['config']['SCRAPE']['add_from_scrape']: # state = updateDatabase(state) # state['output'] = "Articles were successfully downloaded." return state
[docs] def arxivStageOne(query, state): """ Searches for artciles on arXiv related to a users query, and displays search results that can be downloaded in stage two. :param query: The search query for arXiv. :type query: str :return: state """ process = {} output = 'searching the following on arxiv: ' + query state = log.userOutput(output, state=state) df, pdfs = arxiv_search(query, 10, state=state) process['search results'] = df displayDf = set_arxiv_df_display(df, pdfs, gui=state.get('gui', False)) if state.get('gui'): output = "\n\n" output += displayDf.to_markdown() output += "\n\n" else: display(displayDf) if state['config']['SCRAPE']['save_search_results']: utils.save(state, df, "arxiv-search-" + str(query) + '.csv') if len(state['queue']) == 0: if state['gui']: output += '\n Would you like to download these articles [Y/N]?' state = log.userOutput(output, state=state) else: output = '\n Would you like to download these articles [Y/N]?' state = log.userOutput(output, state=state) state['continue-module'] = ('SCRAPE', { 'database': 'ARXIV', 'pdfs': pdfs }) return state
[docs] def set_arxiv_df_display(df, pdfs, gui=False): """ Modify an arXiv DataFrame to include clickable markdown links for paper titles. This function processes a DataFrame containing paper information from arXiv and updates the 'Title' column to include clickable markdown links that point to the corresponding PDF URLs. It also retains the 'Authors' column for display. Parameters: df (pandas.DataFrame): The DataFrame containing the arXiv data. It must include a 'Title' and an 'Authors' column. pdfs (list of str): A list of URLs corresponding to the PDF links for the papers. The order of URLs should match the order of titles in the DataFrame. Returns: pandas.DataFrame: A DataFrame containing two columns: - 'Title': Titles converted into clickable markdown links. - 'Authors': The original authors column from the input DataFrame. Raises: ValueError: If the length of the `pdfs` list does not match the number of titles in `df`. Example: >>> df = pd.DataFrame({'Title': ['Paper A', 'Paper B'], 'Authors': ['Author X', 'Author Y']}) >>> pdfs = ['http://arxiv.org/pdf/paperA.pdf', 'http://arxiv.org/pdf/paperB.pdf'] >>> display_df = set_arxiv_df_display(df, pdfs) >>> print(display_df) """ # Auth: Joshua Pickard # jpic@umich.edu # Date: Nov. 17, 2024 # Check if the lengths of titles and PDF links match if len(df['Title']) != len(pdfs): raise ValueError("The number of titles in the DataFrame must match the number of PDF URLs.") # Select relevant columns display_df = df[['Title', 'Authors']].copy() # Create markdown links for titles if gui: markdown_titles = [ f"[{title}]({pdf})" for title, pdf in zip(display_df['Title'].values, pdfs) ] display_df['Title'] = markdown_titles return display_df
[docs] def arxivStageTwo(query, state): pdfs = state['continue-module'][1]['pdfs'] pdf_string = arxiv_scrape(pdfs, state) state = log.userOutput("The following articles were downloaded:\n\n"+ pdf_string, state=state) return state
[docs] def arxiv(query, state): """ Searches for articles on the arXiv repository based on the given query, displays search results, and optionally downloads articles as PDFs. :param query: The search query for arXiv. :type query: str :return: A tuple containing the output message and a process dictionary. :rtype: tuple """ # Auth: Marc Choi # machoi@umich.edu process = {} output = 'searching the following on arxiv: ' + query state = log.userOutput(output, state=state) df, pdfs = arxiv_search(query, 10, state=state) process['search results'] = df displayDf = df[['Title', 'Authors', 'Abstract']] display(displayDf) if state['config']['SCRAPE']['save_search_results']: utils.save(state, df, "arxiv-search-" + str(query) + '.csv') if len(state['queue']) == 0: if state['interactive']: output += '\n Would you like to download these articles [Y/N]?' state = log.userOutput('Would you like to download these articles [Y/N]?', state=state) download = input().strip().upper() state['process']['steps'].append( { 'func' : 'scraper.arxiv', 'prompt to user' : 'Do you want to proceed with this plan? [Y/N/edit]', 'input' : download, 'purpose' : 'decide to download pdfs or not' } ) else: download = state['SCRAPE']['download_search_results'] else: download = 'Y' process['download'] = (download == 'Y') if download == 'Y': output += arxiv_scrape(pdfs, state) return output, process
[docs] def search_pubmed_article(query, number_of_articles=10, state=None): """ Searches PubMed for articles matching the specified query and retrieves their PMIDs. :param query: The keyword or phrase to search for in PubMed articles. :type query: str :param number_of_articles: The maximum number of article PMIDs to return. Defaults to 10. :type number_of_articles: int :return: A list of PMIDs for articles matching the query. :rtype: list """ # Auth: Marc Choi # machoi@umich.edu Entrez.email = 'inhyak@gmail.com' log.debugLog("search_pubmed_article", state=state) log.debugLog(f'term={query}', state=state) handle = Entrez.esearch(db='pubmed', term = query, retmax=number_of_articles, sort='relevance') log.debugLog(f'handle={str(handle)}', state=state) record = Entrez.read(handle) log.debugLog(f'record={str(record)}', state=state) handle.close() return record['IdList']
[docs] def pubmed(query, state): """ Scrapes PubMed for articles matching the query, retrieves their PMIDs, and downloads available PDFs. :param query: The keyword to search for in PubMed articles. :type query: str """ # Auth: Marc Choi # machoi@umich.edu pmid_list = search_pubmed_article(query, state=state) citation_arr = [] if pmid_list: for pmid in pmid_list: citation = pmid citation_arr.append(citation) s = HTMLSession() headers = {'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36'} try: path = utils.pdfDownloadPath(state) # os.path.abspath(os.getcwd()) + '/specialized_docs' os.makedirs(path, exist_ok = True) log.debugLog("Directory '%s' created successfully" % path, state=state) except OSError as error: log.debugLog("Directory '%s' can not be created" % path, state=state) for pmc in citation_arr: try: base_url = 'https://pubmed.ncbi.nlm.nih.gov/' r = s.get(base_url + pmc + '/', headers = headers, timeout = 5) if r.html.find('a.id-link', first=True) is not None: pdf_url = r.html.find('a.id-link', first=True).attrs['href'] if 'ncbi.nlm.nih.gov' not in pdf_url: continue r = s.get(pdf_url, headers = headers, timeout = 5) try: ending = r.html.find('a.int-view', first=True).attrs['href'] pdf_real = 'https://ncbi.nlm.nih.gov'+ending r = s.get(pdf_real, stream=True, timeout = 5) with open(os.path.join(path, pmc + '.pdf'), 'wb') as f: for chunk in r.iter_content(chunk_size = 1024): if chunk: f.write(chunk) except AttributeError as e: print(e) log.debugLog(f"{pmc} could not be gathered.", state=state) pass except ConnectionError as e: pass log.debugLog(f"{pmc} could not be gathered.", state=state) else: log.debugLog("no articles found", state=state) log.debugLog("pdf collection complete!", state=state)
[docs] def biorxiv(query, state): """ Scrapes the bioRxiv preprint server for articles matching a specific query. :param query: The keyword to search for in bioRxiv articles. :type query: str """ # Auth: Marc Choi # machoi@umich.edu biorxiv_real_search(state = state, start_date = datetime.date.today().replace(year=2015), end_date = datetime.date.today(), subjects = [], journal = 'biorxiv', kwd = [query], kwd_type = 'all', athr = [], max_records = 10, max_time = 300, cols = ['title', 'authors', 'url'], abstracts = False )
#Parsers
[docs] def create_db(query, query2): """ Creates a database from scraped articles and PDFs based on given queries. :param query: The keyword to search for in PubMed articles. :param query2: The keyword to search for in arXiv and bioRxiv articles. :type query: str :type query2: str """ # Auth: Marc Choi # machoi@umich.edu log.debugLog('creating database (this might take a while)', state=state) arxivscrape(query2) biorxiv_scrape(query2) pubmedscrape(query) local = os.getcwd() ## Get local dir os.chdir(local) ## shift the work dir to local dir log.debugLog('\nWork Directory: {}'.format(local), state=state) # Phase 1 - Load embedding model embeddings_model = HuggingFaceEmbeddings( model_name='BAAI/bge-base-en-v1.5') # Phase 2 - Load documents path_docs = utils.pdfDownloadPath(state) # './specialized_docs/' log.debugLog('\nDocuments loading from:',path_docs, state=state) text_loader_kwargs={'autodetect_encoding': True} loader = DirectoryLoader(path_docs, glob="**/*.pdf", loader_cls=UnstructuredPDFLoader, loader_kwargs=text_loader_kwargs, show_progress=True, use_multithreading=True) docs_data = loader.load() log.debugLog('\nDocuments loaded...', state=state) # Phase 3 - Split the text from langchain.text_splitter import RecursiveCharacterTextSplitter persist_directory = "./custom_dbs_fullScale_cosine/" # User input arr_chunk_size = [700] #Chunk size arr_chunk_overlap = [200] #Chunk overlap for i in range(len(arr_chunk_size)): for j in range(len(arr_chunk_overlap)): text_splitter = RecursiveCharacterTextSplitter(chunk_size = arr_chunk_size[i], chunk_overlap = arr_chunk_overlap[j], separators=[" ", ",", "\n", ". "]) data_splits = text_splitter.split_documents(docs_data) log.debugLog('\nDocuments split into chunks...', state=state) #%% Phase 2 - Split the text log.debugLog('\nInitializing Chroma Database...', state=state) db_name = "custom_DB_cosine_cSize_%d_cOver_%d" %(arr_chunk_size[i], arr_chunk_overlap[j]) p2_2 = subprocess.run('mkdir %s/*'%(persist_directory+db_name), shell=True) _client_settings = chromadb.PersistentClient(path=(persist_directory+db_name)) vectordb = Chroma.from_documents(documents = data_splits, embedding = embeddings_model, client = _client_settings, collection_name = db_name, collection_metadata={"hnsw:space": "cosine"}) log.debugLog('Completed Chroma Database: ' + str(db_name), state=state) del vectordb, text_splitter, data_splits
[docs] def arxiv_scrape(pdf_urls, state): """ Downloads PDFs from a list of URLs pointing to arXiv articles. :param pdf_urls: A list of URLs pointing to arXiv articles in PDF format. :type pdf_urls: list """ # Auth: Marc Choi # machoi@umich.edu s = HTMLSession() try: path = utils.pdfDownloadPath(state) # os.path.abspath(os.getcwd()) + '/specialized_docs' os.makedirs(path, exist_ok = True) log.debugLog("Directory '%s' created successfully" % path, state=state) except OSError as error: log.debugLog("Directory '%s' can not be created" % path, state=state) headers = {'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36'} pdf_string = "" for papers in pdf_urls: pdf_string += papers+"\n" try: r = s.get(papers, stream=True) paper_id = papers[-10:] with open(os.path.join(path, paper_id + '.pdf'), 'wb') as f: for chunk in r.iter_content(chunk_size = 1024): if chunk: f.write(chunk) except ConnectionError as e: pass log.debugLog(f"{pmc} could not be gathered.", state=state) return pdf_string
[docs] def result_set_to_string(result_set): """ Converts a BeautifulSoup result set to a string. :param result_set: The result set to convert to a string. :type result_set: bs4.element.ResultSet :return: The string representation of the result set. :rtype: str """ # Auth: Marc Choi # machoi@umich.edu return ' '.join([element.get_text(strip=True) for element in result_set])
[docs] def parse_llm_response(response): """ Parses the LLM response to extract the database name and search terms. :param response: The response from the LLM. :type response: str :returns: A dictionary with the database name and a list of search terms. :rtype: dict """ # Initialize an empty dictionary to hold the parsed data parsed_data = {} # Split the response into lines lines = response.strip().split('\n') # Extract the database name database_line = lines[0].replace("Database:", "").strip() parsed_data["database"] = database_line # Extract the search terms search_terms_line = lines[1].replace("Search Terms:", "").strip() search_terms = [term.strip() for term in search_terms_line.split(',')] parsed_data["search_terms"] = search_terms return parsed_data
[docs] def updateDatabase(state): """ .. warning: This function contains hardcoded values related to text chunking Update the database with new documents based on the given chat status. This function determines which documents need to be added to the database, downloads them, splits them into chunks, and adds the formatted chunks to the specified database. Args: state (dict): The current chat status containing database information and other parameters. Returns: dict: The updated chat status after adding new documents to the database. """ # Auth: Joshua Pickard # jpic@umich.edu # Date: June 27, 2024 # Determine which documents need to be added to the database new_docs_path = utils.pdfDownloadPath(state) if state['databases']['RAG'] is None: return state if not os.path.isdir(new_docs_path): return state # Warning! these values are hard coded chunk_size=[700] chunk_overlap=[200] # Load all the documents text_loader_kwargs = {'autodetect_encoding': True} new_loader = DirectoryLoader(new_docs_path, glob="**/*.pdf", loader_cls=PyPDFLoader, show_progress=True, use_multithreading=True) new_docs_data = new_loader.load() print('\nNew documents loaded...') # Split the new document into chunks text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size[0], chunk_overlap=chunk_overlap[0], separators=[" ", ",", "\n", ". "]) new_data_splits = text_splitter.split_documents(new_docs_data) print("New document split into chunks...") # Format the document splits to be placed into the database new_data_splits docs, meta = [], [] for doc in new_data_splits: docs.append(doc.page_content) meta.append(doc.metadata) # Add to the database log.debugLog('Adding texts to database', state) log.debugLog(f'len(docs)={len(docs)}', state) if len(docs) == 0: log.debugLog('exiting updateDatabase() because no new docs were found', state) return state state['databases']['RAG'].add_texts(texts = docs, meta = meta) log.debugLog('Done adding texts to database', state) return state