Enhancing Data Interaction with Cortex and Document AI
With Snowflake Cortex, we’ve gained powerful tools for implementing sophisticated AI solutions directly within our data ecosystem. This post will take you beyond the basics, demonstrating how to build a comprehensive solution that integrates document AI, LLM Chat, and interactive visualizations — all within Snowflake.
The solution I am going to discuss here enables the extraction of PDFs from publicly available repositories, providing users with access to relevant details in chunks of text, along with references to the original document locations. By utilizing Cortex for data interaction and Document AI for extracting key metrics in a structured form, the solution efficiently processes and inserts data into an in-house repository for various business purposes. Here, I have used the USDA public dataset to showcase its effectiveness in enhancing data accessibility and interaction.
Solution Overview
Our solution consists of 4 components:
- Web scraping and data ingestion
- Document AI for structured data extraction
- LLM chat for data interaction
- Streamlit app for visualization and user interaction
Let’s dive into each component. Before that, set up the network rule and external access integrations which will allow us to access our source URLs inside the Snowflake account.
Configuring Egress Network Rules for Data Ingestion in Snowflake
Below network rule (My_outbound_network_rule_pdf) allows outbound network traffic to specified host and port combinations. The rule includes domains like http://usda.library.cornell.edu/ and specific IP addresses with optional ports. This configuration ensures controlled external access for data transfers or API calls.
CREATE OR REPLACE NETWORK RULE My_outbound_network_rule_pdf
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('downloads.usda.library.cornell.edu','usda.library.cornell.edu');
Configuring External Access Integration for Secure Data Retrieval
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION www_access_integration
ALLOWED_NETWORK_RULES = (My_outbound_network_rule_pdf)
ENABLED = true;
- Web Scraping and Data Ingestion
To start our pipeline, we’ve implemented a Python stored procedure for fetching PDF documents from the USDA website and storing them in a Snowflake internal stage:
This stored procedure fetches PDF links from a given URL and its subsequent pages, downloads those PDFs, and uploads them to a Snowflake stage (@DOCS). It uses threading to manage multiple downloads concurrently and ensures that already uploaded PDFs are not duplicated.
It is important to set the encryption to ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')
for LLM operations
create or replace stage docs ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE') DIRECTORY = ( ENABLE = true );
CREATE OR REPLACE PROCEDURE usfd_python_file_pro(start_url STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'fetch_pdfs_and_store_pro'
EXTERNAL_ACCESS_INTEGRATIONS = (www_access_integration)
PACKAGES = ('requests', 'beautifulsoup4', 'snowflake-snowpark-python')
AS
$$
import requests
from bs4 import BeautifulSoup
import os
from urllib.parse import urljoin, urlparse
from threading import Thread, Lock
def get_pdf_links(url):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
pdf_links = [link['href'] for link in soup.find_all('a', href=True) if link['href'].lower().endswith('.pdf')]
return pdf_links
def get_next_page_link(page_url):
response = requests.get(page_url)
soup = BeautifulSoup(response.content, 'html.parser')
next_page_link = soup.find('a', rel='next', href=True)
if next_page_link:
next_page_href = next_page_link['href']
if next_page_href.startswith('http'):
return next_page_href
else:
return urljoin(page_url, next_page_href)
return None
def is_file_uploaded(session, filename):
query = f"LIST @DOCS/{filename}"
try:
result = session.execute(query).fetchall()
return len(result) > 0
except Exception as e:
print(f"Error checking file existence: {str(e)}")
return False
def download_and_upload_pdf(session, pdf_url, lock):
pdf_name = os.path.basename(urlparse(pdf_url).path)
try:
with lock:
if is_file_uploaded(session, pdf_name):
print(f"Skipping already uploaded PDF: {pdf_name}")
return
response = requests.get(pdf_url)
if response.status_code == 200:
print(f"Downloading PDF: {pdf_url}")
tmp_dir = '/tmp/snowflake/'
os.makedirs(tmp_dir, exist_ok=True)
local_file_path = os.path.join(tmp_dir, pdf_name)
with open(local_file_path, 'wb') as f:
f.write(response.content)
stage_path = f"@DOCS/{pdf_name}"
put_result = session.file.put(f"file://{local_file_path}", stage_path, auto_compress=False)
if put_result[0].status == 'UPLOADED':
print(f"Uploaded PDF: {pdf_name} to Snowflake stage")
else:
print(f"Failed to upload PDF: {pdf_name}")
os.remove(local_file_path)
else:
print(f"Failed to download PDF from {pdf_url}. Status code: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Error fetching PDF from URL: {pdf_url} - {str(e)}")
def fetch_pdfs_and_store_pro(session, start_url):
page_url = start_url
lock = Lock()
threads = []
max_threads = 10
while page_url:
pdf_links = get_pdf_links(page_url)
print(f"PDF Links on {page_url}: {pdf_links}")
for pdf_link in pdf_links:
pdf_url = pdf_link if pdf_link.startswith('http') else urljoin('https://usda.library.cornell.edu', pdf_link)
thread = Thread(target=download_and_upload_pdf, args=(session, pdf_url, lock))
thread.start()
threads.append(thread)
if len(threads) >= max_threads:
for thread in threads:
thread.join()
threads = []
page_url = get_next_page_link(page_url)
for thread in threads:
thread.join()
return "PDF metadata fetching and storing completed"
$$;
CALL usfd_python_file_pro('https://usda.library.cornell.edu/concern/publications/j098zb09z?locale=en'); # test Call
2. Setting Up Streams to Capture Changes
Next, create a stream (ACRE_PDF_STREAM) on the @DOCS stage to capture changes to data, enabling efficient tracking of new, modified, or deleted files on the stage. Setting up this stream allows for monitoring and processing of PDF files as they are added to the DOCS stage.
CREATE or REPLACE STREAM DOCAI_LLM_ML.AI_ML.ACRE_PDF_STREAM ON STAGE DOCS;
3. Key Metrics Extraction Using Document AI
Create a table to capture the extracted data
CREATE OR REPLACE TABLE DOCAI_LLM_ML.AI_ML. DOCAI_PDF_PREDICTION (
file_name STRING,
file_size INTEGER,
last_modified TIMESTAMP_NTZ,
snowflake_file_url STRING,
ocrScore FLOAT,
acres_score FLOAT,
acres_value STRING,
states_score FLOAT,
states_value STRING,
CornHarvested_score FLOAT,
CornHarvested_value STRING,
CornPlanted_score FLOAT,
CornPlanted_value STRING,
WheatHarvested_score FLOAT,
WheatHarvested_value STRING,
WheatPlanted_score FLOAT,
WheatPlanted_value STRING
);
The DOCAI_PREDICTION procedure processes newly inserted PDF files from the ACRE_PDF_STREAM, extracting relevant data and predictions using a predictive model. It then inserts this data into the DOCAI_LLM_ML.AI_ML.FARM_DATA table, including metadata like file name, size, and prediction scores. Finally, it counts the total number of processed files and returns this count as a string.
CREATE OR REPLACE PROCEDURE DOCAI_PREDICTION ()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
file_count INTEGER;
BEGIN
-- Initialize file_count variable to 0
file_count := 0;
-- Insert data into the table
INSERT INTO DOCAI_LLM_ML.AI_ML.FARM_DATA (
file_name,
file_size,
last_modified,
snowflake_file_url,
ocrScore,
acres_score,
acres_value,
states_score,
states_value,
CornHarvested_score,
CornHarvested_value,
CornPlanted_score,
CornPlanted_value,
WheatHarvested_score,
WheatHarvested_value,
WheatPlanted_score,
WheatPlanted_value
)
WITH temp AS (
SELECT
RELATIVE_PATH AS file_name,
SIZE AS file_size,
LAST_MODIFIED::TIMESTAMP_NTZ(9) AS last_modified,
GET_PRESIGNED_URL('@DOCS', RELATIVE_PATH) AS snowflake_file_url,
DOCAI_LLM_ML.AI_ML.DOCAI_PEDICTION_MODEL!PREDICT(GET_PRESIGNED_URL('@DOCS', RELATIVE_PATH), 4) AS json_prediction
FROM ACRE_PDF_STREAM
WHERE METADATA$ACTION = 'INSERT'
)
SELECT
file_name,
file_size,
last_modified,
snowflake_file_url,
json_prediction:__documentMetadata.ocrScore::FLOAT AS ocrScore,
json_prediction:acres[0].score::FLOAT AS acres_score,
json_prediction:acres[0].value::STRING AS acres_value,
json_prediction:states[0].score::FLOAT AS states_score,
json_prediction:states[0].value::STRING AS states_value,
json_prediction:CornHarvested[0].score::FLOAT AS CornHarvested_score,
json_prediction:CornHarvested[0].value::STRING AS CornHarvested_value,
json_prediction:CornPlanted[0].score::FLOAT AS CornPlanted_score,
json_prediction:CornPlanted[0].value::STRING AS CornPlanted_value,
json_prediction:WheatHarvested[0].score::FLOAT AS WheatHarvested_score,
json_prediction:WheatHarvested[0].value::STRING AS WheatHarvested_value,
json_prediction:WheatPlanted[0].score::FLOAT AS WheatPlanted_score,
json_prediction:WheatPlanted[0].value::STRING AS WheatPlanted_value
FROM temp;
-- Set file_count to the number of unique files processed and inserted
file_count := (SELECT COUNT(*) FROM DOCAI_LLM_ML.AI_ML.DOCAI_PDF_PREDICTION);
-- Return a string with the count information
RETURN 'Inserted ' || file_count::STRING || ' files into DOCAI_PDF_PREDICTION table.';
END;
4. Creating Task to Materialize the Model Output
Create a task ACRE_PDF_TASK to run the DOCAI_PREDICTION procedure every 60 minutes. If any new documents are identified in the stream, ingest the prediction data into the DOCAI_PDF_PREDICTION table.
CREATE OR REPLACE TASK ACRE_PDF_TASK
WAREHOUSE = COMPUTE_WH
SCHEDULE = '60 minutes'
COMMENT = 'Task to insert predictions into DOCAI_PDF_PREDICTION table'
AS
CALL ACRE_PDF_TASK();
5. LLM Chat with Data
Processing and Chunking PDF Text
This UDF named “pdf_text_chunker” processes PDF files by extracting text and splitting it into chunks. The function includes:
read_pdf Method: Reads the PDF file from Snowflake storage, extracts text using PyPDF2, and handles extraction errors with logging.
process Method: Splits the extracted text into chunks using RecursiveCharacterTextSplitter with a 4000-character size and 400-character overlap. Converts the chunks into a DataFrame and yields them for database insertion.
Importance of Chunking and Overlapping
Chunking: Large Language Models (LLMs) have a limit on the number of tokens they can process at once. By chunking the text, we ensure that the input size remains manageable and within the model’s limits.
Overlapping: Overlapping ensures that context is preserved across chunks. This is crucial for maintaining the continuity of information, which is especially important for models that rely on contextual understanding to make accurate predictions or generate coherent responses. Without overlapping, the model might lose important context at the boundaries of chunks.
create or replace function pdf_text_chunker(file_url string)
returns table (chunk varchar)
language python
runtime_version = '3.9'
handler = 'pdf_text_chunker'
packages = ('snowflake-snowpark-python','PyPDF2', 'langchain')
as
$$
from snowflake.snowpark.types import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2, io
import logging
import pandas as pd
class pdf_text_chunker:
def read_pdf(self, file_url: str) -> str:
logger = logging.getLogger("udf_logger")
logger.info(f"Opening file {file_url}")
with SnowflakeFile.open(file_url, 'rb') as f:
buffer = io.BytesIO(f.readall())
reader = PyPDF2.PdfReader(buffer)
text = ""
for page in reader.pages:
try:
text += page.extract_text().replace('\n', ' ').replace('\0', ' ')
except:
text = "Unable to Extract"
logger.warn(f"Unable to extract from file {file_url}, page {page}")
return text
def process(self,file_url: str):
text = self.read_pdf(file_url)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size = 4000, #Adjust this as you see fit
chunk_overlap = 400, #This let's text have some form of overlap. Useful for keeping chunks contextual
length_function = len
)
chunks = text_splitter.split_text(text)
df = pd.DataFrame(chunks, columns=['chunks'])
yield from df.itertuples(index=False, name=None)
$$;
Create table to to Store the Vector Embeddings
This script creates a table CHUNK_AI_ML
to store metadata, text chunks, and vector embeddings of text extracted from PDF files, facilitating efficient storage and processing for machine learning applications in Snowflake.
CREATE OR REPLACE TABLE CHUNK_AI_ML(
RELATIVE_PATH VARCHAR(16777216),
SIZE NUMBER(38,0),
FILE_URL VARCHAR(16777216),
SCOPED_FILE_URL VARCHAR(16777216),
CHUNK VARCHAR(16777216),
CHUNK_VEC VECTOR(FLOAT, 768)
);
This code inserts data into the CHUNK_AI_ML table by selecting relevant metadata and text chunks from PDF files stored in the docs stage. It builds scoped URLs for the files, processes the text into chunks using the pdf_text_chunker function, and generates vector embeddings for each chunk using Snowflake’s EMBED_TEXT_768 function. This ensures the text data is ready for efficient AI/ML tasks.
insert into chunk_ai_ml(relative_path, size, file_url,
scoped_file_url, chunk, chunk_vec)
select relative_path,
size,
file_url,
build_scoped_file_url(@docs, relative_path) as scoped_file_url,
func.chunk as chunk,
SNOWFLAKE.CORTEX.EMBED_TEXT_768('e5-base-v2',chunk) as chunk_vec
from
directory(@docs),
TABLE(pdf_text_chunker(build_scoped_file_url(@docs, relative_path))) as func;
Automating Text Extraction and Embedding from PDF Files
Create a task named task_extract_chunk_vec_from_pdf that runs every minute if new data is detected in the ACRE_PDF_STREAM. The task inserts metadata, text chunks, and vector embeddings into the CHUNK_AI_ML table. It selects the necessary data from the ACRE_PDF_STREAM, processes the PDF files using the pdf_text_chunker function created earlier, and generates vector embeddings using the SNOWFLAKE.CORTEX.EMBED_TEXT_768 function. This automates the extraction, chunking, and embedding of text from new PDF files as they are added to the stream.
SNOWFLAKE.CORTEX.EMBED_TEXT_768 generates 768-dimensional vector embeddings from text. These embeddings are useful for various AI/ML tasks, such as text classification, clustering, and similarity search. By transforming text into numerical vectors, the function enables efficient and scalable text processing.
Other Options in Cortex
CORTEX.EMBED_TEXT_512: Generates 512-dimensional vector embeddings, suitable for applications where a smaller vector size is sufficient.
CORTEX.EMBED_TEXT_1024: Provides 1024-dimensional vector embeddings for scenarios requiring higher precision and more detailed text representations.
CORTEX.CLASSIFY_TEXT: Classifies text into predefined categories using a trained machine learning model.
CORTEX.DETECT_ENTITIES: Identifies and extracts named entities from text, such as names, dates, and locations.
CREATE OR REPLACE TASK task_extract_chunk_vec_from_pdf
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 minute'
WHEN system$stream_has_data('ACRE_PDF_STREAM')
AS
INSERT INTO chunk_ai_ml(relative_path, size, file_url,
scoped_file_url, chunk, chunk_vec)
SELECT relative_path,
size,
file_url,
build_scoped_file_url(@docs, relative_path) AS scoped_file_url,
func.chunk AS chunk,
SNOWFLAKE.CORTEX.EMBED_TEXT_768('e5-base-v2', chunk) AS chunk_vec
FROM docs_stream,
TABLE(pdf_text_chunker(build_scoped_file_url(@docs, relative_path))) AS func;
Building the RAG (Retrieval-Augmented Generation)
In RAG, a language model (LLM) is used in conjunction with a retrieval mechanism to provide more contextually relevant and accurate responses by leveraging external data sources. Here’s a brief explanation of how it works in this context:
Vector Similarity Search:
The get_similar_chunks function uses vector cosine similarity to find the most relevant text chunks from the chunk_ai_ml table based on the user’s question. This step retrieves contextual information from the PDF data stored in Snowflake.
Chat History and Question Summarization:
The get_chat_history function retrieves the chat history to maintain context across interactions.
The summarize_question_with_history function extends the user’s question with relevant chat history to ensure the generated query is contextually enriched.
Prompt Creation:
The create_prompt function constructs a prompt that includes the retrieved text chunks (context) and the extended question. It ensures that the language model has access to relevant information when generating responses.
Response Generation:
The complete function uses the snowflake.cortex.complete function to generate a response based on the constructed prompt. This leverages Snowflake Cortex’s capabilities to provide a context-aware answer.
def get_similar_chunks(question):
cmd = """
WITH results AS (
SELECT RELATIVE_PATH,
VECTOR_COSINE_SIMILARITY(chunk_ai_ml.chunk_vec,
SNOWFLAKE.CORTEX.EMBED_TEXT_768('e5-base-v2', ?)) as similarity,
chunk
FROM chunk_ai_ml
ORDER BY similarity DESC
LIMIT ?
)
SELECT chunk, relative_path FROM results
"""
df_chunks = session.sql(cmd, params=[question, num_chunks]).to_pandas()
return df_chunks
def get_chat_history():
start_index = max(0, len(st.session_state.messages) - slide_window)
return st.session_state.messages[start_index:-1]
def summarize_question_with_history(chat_history, question):
prompt = f"""
Based on the chat history below and the question, generate a query that extend the question
with the chat history provided. The query should be in natural language.
Answer with only the query. Do not add any explanation.
<chat_history>
{chat_history}
</chat_history>
<question>
{question}
</question>
"""
cmd = "SELECT snowflake.cortex.complete(?, ?) as response"
df_response = session.sql(cmd, params=[st.session_state.model_name, prompt]).collect()
summary = df_response[0].RESPONSE.replace("'", "")
return summary
def create_prompt(myquestion):
if st.session_state.use_chat_history:
chat_history = get_chat_history()
if chat_history:
question_summary = summarize_question_with_history(chat_history, myquestion)
df_chunks = get_similar_chunks(question_summary)
else:
df_chunks = get_similar_chunks(myquestion)
else:
df_chunks = get_similar_chunks(myquestion)
chat_history = ""
prompt_context = "".join(df_chunks['CHUNK'].replace("'", ""))
# Print reference files
st.sidebar.subheader("Reference Files:")
unique_files = set()
for _, row in df_chunks.iterrows():
file_name = row['RELATIVE_PATH']
if file_name not in unique_files:
st.sidebar.write(f"- {file_name}")
unique_files.add(file_name)
prompt = f"""
You are an expert chat assistance that extracts information from the CONTEXT provided
between <context> and </context> tags.
You offer a chat experience considering the information included in the CHAT HISTORY
provided between <chat_history> and </chat_history> tags.
When answering the question contained between <question> and </question> tags
be concise and do not hallucinate.
If you don't have the information just say so.
Do not mention the CONTEXT used in your answer.
Do not mention the CHAT HISTORY used in your answer.
<chat_history>
{chat_history}
</chat_history>
<context>
{prompt_context}
</context>
<question>
{myquestion}
</question>
Answer:
"""
return prompt
def complete(myquestion):
prompt = create_prompt(myquestion)
cmd = "SELECT snowflake.cortex.complete(?, ?) as response"
df_response = session.sql(cmd, params=[st.session_state.model_name, prompt]).collect()
return df_response
6. Streamlit App for Visualization and Interaction
This Streamlit application provides an interactive interface for users to explore and analyze structured data extracted using Document AI. It also features an AI-powered chatbot for retrieving relevant information from internal documents and augmenting it with an LLM model, presenting the information in a useful manner. By integrating both structured metrics and chat interactions on the same data, analysts, business users, and researchers can work based on a single source of truth, making their tasks faster and easier.
import streamlit as st
from snowflake.snowpark.context import get_active_session
import pandas as pd
import plotly.express as px
# Initialize Streamlit page configuration
st.set_page_config(layout="wide")
# Get the current Snowflake session
session = get_active_session()
# Snowflake table details for Document AI
doc_ai_context = "docai_llm_ml.ai_ml"
doc_ai_source_table = "FARM_DATA"
# Default Values for LLM
num_chunks = 3
slide_window = 7
pd.set_option("max_colwidth", None)
# Function to fetch data from Snowflake for Document AI
def fetch_data():
try:
df = session.table(f"{doc_ai_context}.{doc_ai_source_table}").to_pandas()
columns_to_show = ['OCRSCORE', 'ACRES_VALUE', 'ACRES_SCORE', 'STATES_VALUE', 'STATES_SCORE',
'CORNPLANTED_VALUE', 'CORNPLANTED_SCORE', 'CORNHARVESTED_VALUE', 'CORNHARVESTED_SCORE',
'WHEATPLANTED_VALUE', 'WHEATHARVESTED_VALUE']
df_selected = df[columns_to_show]
return df_selected
except Exception as e:
st.error(f"Error fetching data from Snowflake: {e}")
return None
# Function to create comparison charts
def create_comparison_chart(df, column1, column2, title1, title2, chart_title):
if column1 in df.columns and column2 in df.columns:
data = df[[column1, column2]].melt(var_name='Type', value_name='Value')
fig = px.bar(data, x='Type', y='Value', title=chart_title, barmode='group')
st.plotly_chart(fig)
else:
st.warning(f"Columns '{column1}' or '{column2}' not found in the data.")
# LLM functions
def get_similar_chunks(question):
cmd = """
WITH results AS (
SELECT RELATIVE_PATH,
VECTOR_COSINE_SIMILARITY(chunk_ai_ml.chunk_vec,
SNOWFLAKE.CORTEX.EMBED_TEXT_768('e5-base-v2', ?)) as similarity,
chunk
FROM chunk_ai_ml
ORDER BY similarity DESC
LIMIT ?
)
SELECT chunk, relative_path FROM results
"""
df_chunks = session.sql(cmd, params=[question, num_chunks]).to_pandas()
return df_chunks
def get_chat_history():
start_index = max(0, len(st.session_state.messages) - slide_window)
return st.session_state.messages[start_index:-1]
def summarize_question_with_history(chat_history, question):
prompt = f"""
Based on the chat history below and the question, generate a query that extend the question
with the chat history provided. The query should be in natural language.
Answer with only the query. Do not add any explanation.
<chat_history>
{chat_history}
</chat_history>
<question>
{question}
</question>
"""
cmd = "SELECT snowflake.cortex.complete(?, ?) as response"
df_response = session.sql(cmd, params=[st.session_state.model_name, prompt]).collect()
summary = df_response[0].RESPONSE.replace("'", "")
return summary
def create_prompt(myquestion):
if st.session_state.use_chat_history:
chat_history = get_chat_history()
if chat_history:
question_summary = summarize_question_with_history(chat_history, myquestion)
df_chunks = get_similar_chunks(question_summary)
else:
df_chunks = get_similar_chunks(myquestion)
else:
df_chunks = get_similar_chunks(myquestion)
chat_history = ""
prompt_context = "".join(df_chunks['CHUNK'].replace("'", ""))
st.sidebar.subheader("Reference Files:")
unique_files = set()
for _, row in df_chunks.iterrows():
file_name = row['RELATIVE_PATH']
if file_name not in unique_files:
st.sidebar.write(f"- {file_name}")
unique_files.add(file_name)
prompt = f"""
You are an expert chat assistant that extracts information from the CONTEXT provided
between <context> and </context> tags. You offer a chat experience considering the information
included in the CHAT HISTORY provided between <chat_history> and </chat_history> tags.
When answering the question contained between <question> and </question> tags
be concise and do not hallucinate. If you don't have the information just say so.
Do not mention the CONTEXT used in your answer.
Do not mention the CHAT HISTORY used in your answer.
<chat_history>
{chat_history}
</chat_history>
<context>
{prompt_context}
</context>
<question>
{myquestion}
</question>
Answer:
"""
return prompt
def complete(myquestion):
prompt = create_prompt(myquestion)
cmd = "SELECT snowflake.cortex.complete(?, ?) as response"
df_response = session.sql(cmd, params=[st.session_state.model_name, prompt]).collect()
return df_response
def config_options():
st.sidebar.selectbox('Select your model:', (
'mixtral-8x7b', 'snowflake-arctic', 'mistral-large', 'llama3-8b',
'llama3-70b', 'reka-flash', 'mistral-7b', 'llama2-70b-chat', 'gemma-7b'
), key="model_name")
st.sidebar.checkbox('Do you want that I remember the chat history?', key="use_chat_history", value=True)
st.sidebar.button("Start Over", key="clear_conversation")
st.sidebar.expander("Session State").write(st.session_state)
def init_messages():
if st.session_state.clear_conversation or "messages" not in st.session_state:
st.session_state.messages = []
def get_current_user_info():
try:
session = get_active_session()
username = session.get_current_user()
if not username:
result = session.sql("SELECT CURRENT_USER() AS USERNAME").collect()
username = result[0]['USERNAME']
if not username:
username = session.get_current_account().strip('\"')
return username
except Exception as e:
st.error(f"Error fetching Snowflake username: {e}")
return "Unknown User"
def display_response(question, model, user_avatar):
with st.chat_message("user", avatar=user_avatar):
st.markdown(question)
with st.chat_message("assistant", avatar="❄️"):
message_placeholder = st.empty()
question = question.replace("'", "")
with st.spinner(f"{model} thinking..."):
response = complete(question)
res_text = response[0].RESPONSE.replace("'", "")
message_placeholder.markdown(res_text)
st.session_state.messages.append({"role": "assistant", "content": res_text})
def main():
st.title("Document AI and LLM Assistant")
app_mode = st.sidebar.selectbox("Choose the app mode", ["LLM Assistant", "Document AI"])
if app_mode == "Document AI":
st.header("DOCUMENT AI PREDICTION MODEL")
df = fetch_data()
if df is not None:
st.subheader("Selected Data")
st.write(df)
comparison_type = st.selectbox(
"Select the type of comparison",
[
"Acres vs Corn Planted", "Acres vs Corn Harvested", "Acres vs Wheat Planted", "Acres vs States",
"Corn Planted vs Corn Harvested", "Corn Planted vs Wheat Planted", "Corn Harvested vs Wheat Harvested",
"Wheat Planted vs Wheat Harvested", "Wheat Planted vs States", "Wheat Harvested vs States"
]
)
if comparison_type == "Acres vs Corn Planted":
create_comparison_chart(df, 'ACRES_VALUE', 'CORNPLANTED_VALUE', 'Acres', 'Corn Planted', 'Acres vs Corn Planted')
elif comparison_type == "Acres vs Corn Harvested":
create_comparison_chart(df, 'ACRES_VALUE', 'CORNHARVESTED_VALUE', 'Acres', 'Corn Harvested', 'Acres vs Corn Harvested')
elif comparison_type == "Acres vs Wheat Planted":
create_comparison_chart(df, 'ACRES_VALUE', 'WHEATPLANTED_VALUE', 'Acres', 'Wheat Planted', 'Acres vs Wheat Planted')
elif comparison_type == "Acres vs States":
create_comparison_chart(df, 'ACRES_VALUE', 'STATES_VALUE', 'Acres', 'States', 'Acres vs States')
elif comparison_type == "Corn Planted vs Corn Harvested":
create_comparison_chart(df, 'CORNPLANTED_VALUE', 'CORNHARVESTED_VALUE', 'Corn Planted', 'Corn Harvested', 'Corn Planted vs Corn Harvested')
elif comparison_type == "Corn Planted vs Wheat Planted":
create_comparison_chart(df, 'CORNPLANTED_VALUE', 'WHEATPLANTED_VALUE', 'Corn Planted', 'Wheat Planted', 'Corn Planted vs Wheat Planted')
elif comparison_type == "Corn Harvested vs Wheat Harvested":
create_comparison_chart(df, 'CORNHARVESTED_VALUE', 'WHEATHARVESTED_VALUE', 'Corn Harvested', 'Wheat Harvested', 'Corn Harvested vs Wheat Harvested')
elif comparison_type == "Wheat Planted vs Wheat Harvested":
create_comparison_chart(df, 'WHEATPLANTED_VALUE', 'WHEATHARVESTED_VALUE', 'Wheat Planted', 'Wheat Harvested', 'Wheat Planted vs Wheat Harvested')
elif comparison_type == "Wheat Planted vs States":
create_comparison_chart(df, 'WHEATPLANTED_VALUE', 'STATES_VALUE', 'Wheat Planted', 'States', 'Wheat Planted vs States')
elif comparison_type == "Wheat Harvested vs States":
create_comparison_chart(df, 'WHEATHARVESTED_VALUE', 'STATES_VALUE', 'Wheat Harvested', 'States', 'Wheat Harvested vs States')
elif app_mode == "LLM Assistant":
st.header("Chat Document Assistant with Snowflake Cortex")
user_info = get_current_user_info()
user_avatar = "👤"
st.write("This is the list of documents available for answering your questions:")
docs_available = session.sql("ls @docs").collect()
list_docs = [doc["name"] for doc in docs_available]
with st.expander("Available Documents", expanded=False):
col1, col2, col3 = st.columns(3)
num_docs = len(list_docs)
for i in range(num_docs):
if i % 3 == 0:
with col1:
st.write(f"- {list_docs[i]}")
elif i % 3 == 1:
with col2:
st.write(f"- {list_docs[i]}")
else:
with col3:
st.write(f"- {list_docs[i]}")
config_options()
init_messages()
for message in st.session_state.messages:
if isinstance(message, dict) and "role" in message:
if message["role"] == "user":
with st.chat_message("user", avatar=user_avatar):
st.markdown(message["content"])
else:
with st.chat_message("assistant", avatar="❄️"):
st.markdown(message["content"])
if question := st.chat_input("What do you want to know about your products?"):
st.session_state.messages.append({"role": "user", "content": question})
display_response(question, st.session_state.model_name, user_avatar)
if __name__ == "__main__":
main()
Conclusion
This comprehensive solution demonstrates how to leverage Snowflake Cortex, Document AI, and Streamlit to build a powerful and interactive AI-driven data analysis platform. By integrating web scraping, structured data extraction, LLM chat functionality, and interactive visualizations, we provide users with a seamless experience for exploring and analyzing their data. This approach not only enhances data accessibility but also ensures that all stakeholders work from a single source of truth, improving efficiency and decision-making. We showcased the versatility and scalability of the solution, making it applicable to various domains and use cases. Embracing such advanced AI capabilities within Snowflake unlocks new potentials for data-driven insights and operational excellence.