Skip to content

Redshift

The database module provides a connection class for Amazon Redshift with utilities for running queries, listing tables, and managing connections.

When to Use

Use the database module when you need to:

  • Connect to Redshift (IQVIA, Optum databases)
  • Execute SQL queries and get DataFrames
  • List tables and columns in a schema
  • Run raw SQL for custom analysis

Quick Example

from alx_heor.database import RedshiftConnection

# Connect using environment variables
conn = RedshiftConnection().connect()

# Execute a query
df = conn.query("SELECT * FROM schema.table LIMIT 100")

# List tables in a schema
tables = conn.get_tables(schema="iqvia_pharmetrics_2024q3")

# Get columns for a table
columns = conn.get_columns(
    schema="iqvia_pharmetrics_2024q3",
    table="claims_2024"
)

# Close connection when done
conn.close()

Connection Options

Create a .env file:

REDSHIFT_HOST=your-cluster.redshift.amazonaws.com
REDSHIFT_DATABASE=your_database
REDSHIFT_USER=your_user
REDSHIFT_PASSWORD=your_password
REDSHIFT_PORT=5439  # Optional, defaults to 5439
conn = RedshiftConnection().connect()

Explicit Credentials

conn = RedshiftConnection(
    host="your-cluster.redshift.amazonaws.com",
    database="your_database",
    user="your_user",
    password="your_password",
    port=5439,
).connect()

Common Patterns

Context Manager

from alx_heor.database import RedshiftConnection

with RedshiftConnection().connect() as conn:
    df = conn.query("SELECT COUNT(*) FROM schema.table")
    # Connection automatically closed after block

Schema Exploration

# List all schemas
schemas = conn.query("""
    SELECT DISTINCT schemaname
    FROM pg_tables
    WHERE schemaname NOT LIKE 'pg_%'
    ORDER BY schemaname
""")

# List tables in schema
tables = conn.get_tables(schema="iqvia_pharmetrics_2024q3")
print(f"Found {len(tables)} tables")

# Get table structure
columns = conn.get_columns(
    schema="iqvia_pharmetrics_2024q3",
    table="claims_2024"
)
for col in columns:
    print(f"  {col['column_name']}: {col['data_type']}")

Formatting IN Clauses

from alx_heor.database import format_in_clause

patient_ids = ["123", "456", "789"]
in_clause = format_in_clause(patient_ids)  # "'123', '456', '789'"

sql = f"SELECT * FROM table WHERE pat_id IN ({in_clause})"
  • claims - Uses connection for claims queries
  • enrollment - Uses connection for enrollment data
  • cohort - Uses connection throughout

redshift

Redshift database connector for AWS-hosted claims databases.

This module provides the database connection layer for the alx_heor library. Amazon Redshift is a cloud data warehouse where IQVIA, Optum, and other healthcare claims databases are commonly hosted.

Key Features:

  • Environment-based credential management (no hardcoded passwords)
  • Context manager support for automatic connection cleanup
  • Pandas DataFrame integration for query results
  • Helper methods for schema/table/column exploration
  • Table creation from SELECT queries or DataFrames

Security Model:

Credentials are loaded from environment variables to avoid storing sensitive information in code: - REDSHIFT_HOST: Cluster endpoint (e.g., 'cluster.xxxxx.us-east-1.redshift.amazonaws.com') - REDSHIFT_DATABASE: Database name - REDSHIFT_USER: Username - REDSHIFT_PASSWORD: Password

Set these in your environment or use a .env file with python-dotenv.

Usage Patterns:

Basic query: >>> conn = RedshiftConnection().connect() >>> df = conn.query("SELECT * FROM schema.table LIMIT 10") >>> conn.close()

Context manager (recommended): >>> with RedshiftConnection().connect() as conn: ... df = conn.query("SELECT * FROM schema.table LIMIT 10")

Explore available data: >>> conn = RedshiftConnection().connect() >>> schemas = conn.get_schemas('iqvia') # Find IQVIA schemas >>> tables = conn.get_tables('iqvia_pharmetrics_2024q3') # List tables >>> columns = conn.get_columns('schema', 'claims_2024') # List columns

See Also

config.get_source_config : Get column mappings for each data source claims.get_claims : Query claims using this connection

Notes
  • Redshift is PostgreSQL-compatible but not identical
  • Large queries can take minutes - use LIMIT for testing
  • Consider using write_from_select() for intermediate results
  • The library requires redshift-connector: pip install redshift-connector

RedshiftConnection

Redshift database connection with environment-based credentials.

This class provides a clean interface to Amazon Redshift for healthcare claims data analysis. It handles connection management, query execution, and result conversion to pandas DataFrames.

The class supports two usage patterns: 1. Manual management: Call connect() and close() explicitly 2. Context manager: Automatic cleanup with with statement (recommended)

Credentials can be provided directly or loaded from environment variables: - REDSHIFT_HOST: Cluster endpoint - REDSHIFT_DATABASE: Database name - REDSHIFT_USER: Username - REDSHIFT_PASSWORD: Password

Workflow:

  1. Create connection: conn = RedshiftConnection().connect()
  2. Explore schemas: conn.get_schemas('iqvia')
  3. Explore tables: conn.get_tables('iqvia_pharmetrics_2024q3')
  4. Query data: df = conn.query("SELECT ...")
  5. Close connection: conn.close() (or use context manager)

Key Methods:

  • query(): Execute SQL, return DataFrame
  • execute(): Execute SQL without returning results
  • get_schemas(): List available schemas
  • get_tables(): List tables in a schema
  • get_columns(): List columns in a table
  • write_table(): Write DataFrame to table
  • write_from_select(): Create table from SELECT query
See Also

claims.get_claims : High-level function that uses this connection config.get_source_config : Column mappings for each data source

Notes
  • Always close connections when done (or use context manager)
  • Large queries can take minutes - test with LIMIT first
  • The connection has autocommit enabled by default
  • Failed queries are automatically rolled back before retry

Examples:

Basic usage with manual connection management:

>>> from alx_heor.database import RedshiftConnection
>>> conn = RedshiftConnection().connect()
>>> df = conn.query("SELECT COUNT(*) FROM iqvia_pharmetrics_2024q3.claims_2024")
>>> print(f"Row count: {df.iloc[0, 0]:,}")
>>> conn.close()

Recommended: Context manager for automatic cleanup:

>>> with RedshiftConnection().connect() as conn:
...     schemas = conn.get_schemas('iqvia')
...     print(f"Found {len(schemas)} IQVIA schemas")
...     df = conn.query("SELECT * FROM schema.table LIMIT 10")

Provide credentials directly (not recommended for production):

>>> conn = RedshiftConnection(
...     host='cluster.xxxxx.redshift.amazonaws.com',
...     database='mydb',
...     user='analyst',
...     password='secret',
... ).connect()

Explore available data:

>>> with RedshiftConnection().connect() as conn:
...     # Find IQVIA schemas
...     schemas = conn.get_schemas('iqvia')
...     print("Available schemas:", schemas[:5])
...
...     # List tables in a schema
...     tables = conn.get_tables('iqvia_pharmetrics_2024q3')
...     print(f"Tables: {len(tables)}")
...
...     # Check columns in claims table
...     cols = conn.get_columns('iqvia_pharmetrics_2024q3', 'claims_2024')
...     print(f"Columns: {cols[:10]}")
Source code in alx_heor\database\redshift.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
class RedshiftConnection:
    """Redshift database connection with environment-based credentials.

    This class provides a clean interface to Amazon Redshift for healthcare
    claims data analysis. It handles connection management, query execution,
    and result conversion to pandas DataFrames.

    The class supports two usage patterns:
    1. **Manual management**: Call connect() and close() explicitly
    2. **Context manager**: Automatic cleanup with `with` statement (recommended)

    Credentials can be provided directly or loaded from environment variables:
    - REDSHIFT_HOST: Cluster endpoint
    - REDSHIFT_DATABASE: Database name
    - REDSHIFT_USER: Username
    - REDSHIFT_PASSWORD: Password

    **Workflow:**

    1. Create connection: `conn = RedshiftConnection().connect()`
    2. Explore schemas: `conn.get_schemas('iqvia')`
    3. Explore tables: `conn.get_tables('iqvia_pharmetrics_2024q3')`
    4. Query data: `df = conn.query("SELECT ...")`
    5. Close connection: `conn.close()` (or use context manager)

    **Key Methods:**

    - query(): Execute SQL, return DataFrame
    - execute(): Execute SQL without returning results
    - get_schemas(): List available schemas
    - get_tables(): List tables in a schema
    - get_columns(): List columns in a table
    - write_table(): Write DataFrame to table
    - write_from_select(): Create table from SELECT query

    See Also
    --------
    claims.get_claims : High-level function that uses this connection
    config.get_source_config : Column mappings for each data source

    Notes
    -----
    - Always close connections when done (or use context manager)
    - Large queries can take minutes - test with LIMIT first
    - The connection has autocommit enabled by default
    - Failed queries are automatically rolled back before retry

    Examples
    --------
    Basic usage with manual connection management:

    >>> from alx_heor.database import RedshiftConnection
    >>> conn = RedshiftConnection().connect()
    >>> df = conn.query("SELECT COUNT(*) FROM iqvia_pharmetrics_2024q3.claims_2024")
    >>> print(f"Row count: {df.iloc[0, 0]:,}")
    >>> conn.close()

    Recommended: Context manager for automatic cleanup:

    >>> with RedshiftConnection().connect() as conn:
    ...     schemas = conn.get_schemas('iqvia')
    ...     print(f"Found {len(schemas)} IQVIA schemas")
    ...     df = conn.query("SELECT * FROM schema.table LIMIT 10")

    Provide credentials directly (not recommended for production):

    >>> conn = RedshiftConnection(
    ...     host='cluster.xxxxx.redshift.amazonaws.com',
    ...     database='mydb',
    ...     user='analyst',
    ...     password='secret',
    ... ).connect()

    Explore available data:

    >>> with RedshiftConnection().connect() as conn:
    ...     # Find IQVIA schemas
    ...     schemas = conn.get_schemas('iqvia')
    ...     print("Available schemas:", schemas[:5])
    ...
    ...     # List tables in a schema
    ...     tables = conn.get_tables('iqvia_pharmetrics_2024q3')
    ...     print(f"Tables: {len(tables)}")
    ...
    ...     # Check columns in claims table
    ...     cols = conn.get_columns('iqvia_pharmetrics_2024q3', 'claims_2024')
    ...     print(f"Columns: {cols[:10]}")
    """

    def __init__(
        self,
        host: str | None = None,
        database: str | None = None,
        user: str | None = None,
        password: str | None = None,
    ) -> None:
        """Initialize connection parameters.

        Parameters
        ----------
        host : str, optional
            Redshift cluster endpoint. Falls back to REDSHIFT_HOST env var.
        database : str, optional
            Database name. Falls back to REDSHIFT_DATABASE env var.
        user : str, optional
            Username. Falls back to REDSHIFT_USER env var.
        password : str, optional
            Password. Falls back to REDSHIFT_PASSWORD env var.
        """
        self.host = host or os.getenv("REDSHIFT_HOST")
        self.database = database or os.getenv("REDSHIFT_DATABASE")
        self.user = user or os.getenv("REDSHIFT_USER")
        self.password = password or os.getenv("REDSHIFT_PASSWORD")
        self._conn: Any = None
        self._cursor: Any = None

    def connect(self) -> RedshiftConnection:
        """Establish connection to Redshift.

        Returns
        -------
        RedshiftConnection
            Self, for method chaining.

        Raises
        ------
        ValueError
            If required connection parameters are missing.
        """
        missing = []
        if not self.host:
            missing.append("host (or REDSHIFT_HOST)")
        if not self.database:
            missing.append("database (or REDSHIFT_DATABASE)")
        if not self.user:
            missing.append("user (or REDSHIFT_USER)")
        if not self.password:
            missing.append("password (or REDSHIFT_PASSWORD)")

        if missing:
            raise ValueError(f"Missing required connection parameters: {', '.join(missing)}")

        if not HAS_REDSHIFT:
            raise ImportError(
                "redshift_connector is required for database connections. "
                "Install it with: pip install redshift-connector"
            )

        self._conn = redshift_connector.connect(
            host=self.host,
            database=self.database,
            user=self.user,
            password=self.password,
        )
        self._conn.autocommit = True
        self._cursor = self._conn.cursor()
        return self

    def query(self, sql: str) -> pd.DataFrame:
        """Execute SQL query and return results as a pandas DataFrame.

        This is the primary method for retrieving data from Redshift. The SQL
        is executed, results are fetched, and returned as a DataFrame with
        column names preserved from the query.

        Parameters
        ----------
        sql : str
            SQL query to execute. Can be any valid Redshift/PostgreSQL SELECT
            statement, including JOINs, CTEs, window functions, etc.

        Returns
        -------
        pd.DataFrame
            Query results as a DataFrame. Column names match the query output.
            Empty DataFrame if query returns no rows.

        Raises
        ------
        RuntimeError
            If not connected. Call connect() first.

        See Also
        --------
        execute : Execute SQL without returning results (DDL, DML)
        write_from_select : Create table from SELECT query

        Notes
        -----
        - Large queries can take minutes - test with LIMIT first
        - Previous failed queries are automatically rolled back
        - Memory usage depends on result size - consider chunking large results
        - Date columns are returned as datetime64, not strings

        Examples
        --------
        Simple query:

        >>> df = conn.query("SELECT * FROM schema.table LIMIT 10")
        >>> print(df.head())

        Query with JOIN:

        >>> sql = '''
        ...     SELECT c.pat_id, c.from_dt, e.pay_type
        ...     FROM schema.claims_2024 c
        ...     JOIN schema.enroll2_2024 e ON c.pat_id = e.pat_id
        ...     WHERE c.diag1 = 'G700'
        ...     LIMIT 1000
        ... '''
        >>> df = conn.query(sql)

        Query with CTE:

        >>> sql = '''
        ...     WITH cohort AS (
        ...         SELECT DISTINCT pat_id FROM claims WHERE diag1 = 'G700'
        ...     )
        ...     SELECT c.*, r.generic_name
        ...     FROM cohort c
        ...     JOIN rx_claims r ON c.pat_id = r.pat_id
        ... '''
        >>> df = conn.query(sql)
        """
        if not self._conn or not self._cursor:
            raise RuntimeError("Not connected. Call connect() first.")

        self._conn.rollback()  # Handle any previous failed queries
        self._cursor.execute(sql)
        return self._cursor.fetch_dataframe()

    def execute(self, sql: str) -> None:
        """Execute SQL statement without returning results.

        Parameters
        ----------
        sql : str
            SQL statement to execute.
        """
        if not self._conn or not self._cursor:
            raise RuntimeError("Not connected. Call connect() first.")

        self._conn.rollback()
        self._cursor.execute(sql)

    def get_tables(self, schema: str, keyword: str | None = None) -> list[str]:
        """List tables in a schema.

        Parameters
        ----------
        schema : str
            Schema name (e.g., 'iqvia_pharmetrics_2021q3').
        keyword : str, optional
            Filter tables containing this keyword.

        Returns
        -------
        list[str]
            List of table names.
        """
        sql = f"""
            SELECT t.table_name
            FROM information_schema.tables t
            WHERE t.table_schema = '{schema}'
              AND t.table_type = 'BASE TABLE'
            ORDER BY t.table_name
        """
        self._cursor.execute(sql)
        tables = [row[0] for row in self._cursor.fetchall()]

        if keyword:
            tables = [t for t in tables if keyword in t]

        return tables

    def get_columns(self, schema: str, table: str, keyword: str | None = None) -> list[str]:
        """List columns in a table.

        Parameters
        ----------
        schema : str
            Schema name.
        table : str
            Table name.
        keyword : str, optional
            Filter columns containing this keyword.

        Returns
        -------
        list[str]
            List of column names.
        """
        sql = f"""
            SELECT column_name
            FROM information_schema.columns
            WHERE table_schema = '{schema}'
              AND table_name = '{table}'
        """
        self._cursor.execute(sql)
        columns = [row[0] for row in self._cursor.fetchall()]

        if keyword:
            columns = [c for c in columns if keyword in c]

        return columns

    def close(self) -> None:
        """Close the database connection."""
        if self._conn:
            self._conn.close()
            self._conn = None
            self._cursor = None

    def __enter__(self) -> RedshiftConnection:
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Context manager exit - close connection."""
        self.close()

    @property
    def is_connected(self) -> bool:
        """Check if connection is active."""
        return self._conn is not None

    def get_schemas(self, keyword: str | None = None) -> list[str]:
        """List schemas in the database.

        Parameters
        ----------
        keyword : str, optional
            Filter schemas containing this keyword (case-insensitive).

        Returns
        -------
        list[str]
            List of schema names.

        Example
        -------
        >>> conn.get_schemas('iqvia')
        ['iqvia_pharmetrics_2024q3', 'iqvia_pharmetrics_mortality_2024q3', ...]
        """
        if not self._conn or not self._cursor:
            raise RuntimeError("Not connected. Call connect() first.")

        sql = "SELECT nspname AS schema_name FROM pg_namespace"
        if keyword:
            sql += f" WHERE nspname LIKE '%{keyword.lower()}%'"
        sql += " ORDER BY nspname"

        self._cursor.execute(sql)
        return [row[0] for row in self._cursor.fetchall()]

    def drop_table(self, schema: str, table: str) -> None:
        """Drop a table if it exists.

        Parameters
        ----------
        schema : str
            Schema name (e.g., 'sbx_username').
        table : str
            Table name.

        Example
        -------
        >>> conn.drop_table('sbx_kskn306', 'temp_cohort')
        """
        if not self._conn or not self._cursor:
            raise RuntimeError("Not connected. Call connect() first.")

        sql = f"DROP TABLE IF EXISTS {schema}.{table}"
        self._cursor.execute(sql)
        self._conn.commit()

    def write_table(
        self,
        df: pd.DataFrame,
        schema: str,
        table: str,
        if_exists: str = "replace",
        chunksize: int = 5000,
    ) -> int:
        """Write a DataFrame to a Redshift table.

        Uses SQLAlchemy for efficient bulk inserts with chunking.

        Parameters
        ----------
        df : pd.DataFrame
            DataFrame to write.
        schema : str
            Target schema name.
        table : str
            Target table name.
        if_exists : str, default='replace'
            What to do if table exists: 'replace', 'append', or 'fail'.
        chunksize : int, default=5000
            Number of rows to insert at a time.

        Returns
        -------
        int
            Number of rows written.

        Example
        -------
        >>> conn.write_table(df_cohort, 'sbx_kskn306', 'my_cohort')
        1234
        """
        if not self._conn:
            raise RuntimeError("Not connected. Call connect() first.")

        try:
            from sqlalchemy import create_engine
        except ImportError:
            raise ImportError(
                "sqlalchemy is required for write_table. "
                "Install with: pip install sqlalchemy sqlalchemy-redshift psycopg2"
            )

        # Build connection string for SQLAlchemy
        conn_str = (
            f"redshift+psycopg2://{self.user}:{self.password}"
            f"@{self.host}:5439/{self.database}"
        )

        self._conn.rollback()
        engine = create_engine(conn_str)
        df.to_sql(
            table,
            engine,
            index=False,
            if_exists=if_exists,
            schema=schema,
            chunksize=chunksize,
            method="multi",
        )

        return len(df)

    def write_from_select(
        self,
        sql: str,
        schema: str,
        table: str,
        overwrite: bool = True,
    ) -> None:
        """Create a table from a SELECT query.

        Parameters
        ----------
        sql : str
            SELECT statement (without INTO clause).
        schema : str
            Target schema name.
        table : str
            Target table name.
        overwrite : bool, default=True
            If True, drop existing table first.

        Example
        -------
        >>> conn.write_from_select(
        ...     "SELECT pat_id, from_dt FROM claims WHERE diag1 = 'G35'",
        ...     'sbx_kskn306', 'ms_patients'
        ... )
        """
        if not self._conn or not self._cursor:
            raise RuntimeError("Not connected. Call connect() first.")

        if overwrite:
            self.drop_table(schema, table)

        # Build SELECT INTO statement
        into_sql = f"SELECT * INTO {schema}.{table} FROM ({sql}) AS subquery"
        self._conn.rollback()
        self._cursor.execute(into_sql)
        self._conn.commit()

is_connected property

is_connected: bool

Check if connection is active.

__init__

__init__(host: str | None = None, database: str | None = None, user: str | None = None, password: str | None = None) -> None

Initialize connection parameters.

Parameters:

Name Type Description Default
host str

Redshift cluster endpoint. Falls back to REDSHIFT_HOST env var.

None
database str

Database name. Falls back to REDSHIFT_DATABASE env var.

None
user str

Username. Falls back to REDSHIFT_USER env var.

None
password str

Password. Falls back to REDSHIFT_PASSWORD env var.

None
Source code in alx_heor\database\redshift.py
def __init__(
    self,
    host: str | None = None,
    database: str | None = None,
    user: str | None = None,
    password: str | None = None,
) -> None:
    """Initialize connection parameters.

    Parameters
    ----------
    host : str, optional
        Redshift cluster endpoint. Falls back to REDSHIFT_HOST env var.
    database : str, optional
        Database name. Falls back to REDSHIFT_DATABASE env var.
    user : str, optional
        Username. Falls back to REDSHIFT_USER env var.
    password : str, optional
        Password. Falls back to REDSHIFT_PASSWORD env var.
    """
    self.host = host or os.getenv("REDSHIFT_HOST")
    self.database = database or os.getenv("REDSHIFT_DATABASE")
    self.user = user or os.getenv("REDSHIFT_USER")
    self.password = password or os.getenv("REDSHIFT_PASSWORD")
    self._conn: Any = None
    self._cursor: Any = None

connect

connect() -> RedshiftConnection

Establish connection to Redshift.

Returns:

Type Description
RedshiftConnection

Self, for method chaining.

Raises:

Type Description
ValueError

If required connection parameters are missing.

Source code in alx_heor\database\redshift.py
def connect(self) -> RedshiftConnection:
    """Establish connection to Redshift.

    Returns
    -------
    RedshiftConnection
        Self, for method chaining.

    Raises
    ------
    ValueError
        If required connection parameters are missing.
    """
    missing = []
    if not self.host:
        missing.append("host (or REDSHIFT_HOST)")
    if not self.database:
        missing.append("database (or REDSHIFT_DATABASE)")
    if not self.user:
        missing.append("user (or REDSHIFT_USER)")
    if not self.password:
        missing.append("password (or REDSHIFT_PASSWORD)")

    if missing:
        raise ValueError(f"Missing required connection parameters: {', '.join(missing)}")

    if not HAS_REDSHIFT:
        raise ImportError(
            "redshift_connector is required for database connections. "
            "Install it with: pip install redshift-connector"
        )

    self._conn = redshift_connector.connect(
        host=self.host,
        database=self.database,
        user=self.user,
        password=self.password,
    )
    self._conn.autocommit = True
    self._cursor = self._conn.cursor()
    return self

query

query(sql: str) -> pd.DataFrame

Execute SQL query and return results as a pandas DataFrame.

This is the primary method for retrieving data from Redshift. The SQL is executed, results are fetched, and returned as a DataFrame with column names preserved from the query.

Parameters:

Name Type Description Default
sql str

SQL query to execute. Can be any valid Redshift/PostgreSQL SELECT statement, including JOINs, CTEs, window functions, etc.

required

Returns:

Type Description
DataFrame

Query results as a DataFrame. Column names match the query output. Empty DataFrame if query returns no rows.

Raises:

Type Description
RuntimeError

If not connected. Call connect() first.

See Also

execute : Execute SQL without returning results (DDL, DML) write_from_select : Create table from SELECT query

Notes
  • Large queries can take minutes - test with LIMIT first
  • Previous failed queries are automatically rolled back
  • Memory usage depends on result size - consider chunking large results
  • Date columns are returned as datetime64, not strings

Examples:

Simple query:

>>> df = conn.query("SELECT * FROM schema.table LIMIT 10")
>>> print(df.head())

Query with JOIN:

>>> sql = '''
...     SELECT c.pat_id, c.from_dt, e.pay_type
...     FROM schema.claims_2024 c
...     JOIN schema.enroll2_2024 e ON c.pat_id = e.pat_id
...     WHERE c.diag1 = 'G700'
...     LIMIT 1000
... '''
>>> df = conn.query(sql)

Query with CTE:

>>> sql = '''
...     WITH cohort AS (
...         SELECT DISTINCT pat_id FROM claims WHERE diag1 = 'G700'
...     )
...     SELECT c.*, r.generic_name
...     FROM cohort c
...     JOIN rx_claims r ON c.pat_id = r.pat_id
... '''
>>> df = conn.query(sql)
Source code in alx_heor\database\redshift.py
def query(self, sql: str) -> pd.DataFrame:
    """Execute SQL query and return results as a pandas DataFrame.

    This is the primary method for retrieving data from Redshift. The SQL
    is executed, results are fetched, and returned as a DataFrame with
    column names preserved from the query.

    Parameters
    ----------
    sql : str
        SQL query to execute. Can be any valid Redshift/PostgreSQL SELECT
        statement, including JOINs, CTEs, window functions, etc.

    Returns
    -------
    pd.DataFrame
        Query results as a DataFrame. Column names match the query output.
        Empty DataFrame if query returns no rows.

    Raises
    ------
    RuntimeError
        If not connected. Call connect() first.

    See Also
    --------
    execute : Execute SQL without returning results (DDL, DML)
    write_from_select : Create table from SELECT query

    Notes
    -----
    - Large queries can take minutes - test with LIMIT first
    - Previous failed queries are automatically rolled back
    - Memory usage depends on result size - consider chunking large results
    - Date columns are returned as datetime64, not strings

    Examples
    --------
    Simple query:

    >>> df = conn.query("SELECT * FROM schema.table LIMIT 10")
    >>> print(df.head())

    Query with JOIN:

    >>> sql = '''
    ...     SELECT c.pat_id, c.from_dt, e.pay_type
    ...     FROM schema.claims_2024 c
    ...     JOIN schema.enroll2_2024 e ON c.pat_id = e.pat_id
    ...     WHERE c.diag1 = 'G700'
    ...     LIMIT 1000
    ... '''
    >>> df = conn.query(sql)

    Query with CTE:

    >>> sql = '''
    ...     WITH cohort AS (
    ...         SELECT DISTINCT pat_id FROM claims WHERE diag1 = 'G700'
    ...     )
    ...     SELECT c.*, r.generic_name
    ...     FROM cohort c
    ...     JOIN rx_claims r ON c.pat_id = r.pat_id
    ... '''
    >>> df = conn.query(sql)
    """
    if not self._conn or not self._cursor:
        raise RuntimeError("Not connected. Call connect() first.")

    self._conn.rollback()  # Handle any previous failed queries
    self._cursor.execute(sql)
    return self._cursor.fetch_dataframe()

execute

execute(sql: str) -> None

Execute SQL statement without returning results.

Parameters:

Name Type Description Default
sql str

SQL statement to execute.

required
Source code in alx_heor\database\redshift.py
def execute(self, sql: str) -> None:
    """Execute SQL statement without returning results.

    Parameters
    ----------
    sql : str
        SQL statement to execute.
    """
    if not self._conn or not self._cursor:
        raise RuntimeError("Not connected. Call connect() first.")

    self._conn.rollback()
    self._cursor.execute(sql)

get_tables

get_tables(schema: str, keyword: str | None = None) -> list[str]

List tables in a schema.

Parameters:

Name Type Description Default
schema str

Schema name (e.g., 'iqvia_pharmetrics_2021q3').

required
keyword str

Filter tables containing this keyword.

None

Returns:

Type Description
list[str]

List of table names.

Source code in alx_heor\database\redshift.py
def get_tables(self, schema: str, keyword: str | None = None) -> list[str]:
    """List tables in a schema.

    Parameters
    ----------
    schema : str
        Schema name (e.g., 'iqvia_pharmetrics_2021q3').
    keyword : str, optional
        Filter tables containing this keyword.

    Returns
    -------
    list[str]
        List of table names.
    """
    sql = f"""
        SELECT t.table_name
        FROM information_schema.tables t
        WHERE t.table_schema = '{schema}'
          AND t.table_type = 'BASE TABLE'
        ORDER BY t.table_name
    """
    self._cursor.execute(sql)
    tables = [row[0] for row in self._cursor.fetchall()]

    if keyword:
        tables = [t for t in tables if keyword in t]

    return tables

get_columns

get_columns(schema: str, table: str, keyword: str | None = None) -> list[str]

List columns in a table.

Parameters:

Name Type Description Default
schema str

Schema name.

required
table str

Table name.

required
keyword str

Filter columns containing this keyword.

None

Returns:

Type Description
list[str]

List of column names.

Source code in alx_heor\database\redshift.py
def get_columns(self, schema: str, table: str, keyword: str | None = None) -> list[str]:
    """List columns in a table.

    Parameters
    ----------
    schema : str
        Schema name.
    table : str
        Table name.
    keyword : str, optional
        Filter columns containing this keyword.

    Returns
    -------
    list[str]
        List of column names.
    """
    sql = f"""
        SELECT column_name
        FROM information_schema.columns
        WHERE table_schema = '{schema}'
          AND table_name = '{table}'
    """
    self._cursor.execute(sql)
    columns = [row[0] for row in self._cursor.fetchall()]

    if keyword:
        columns = [c for c in columns if keyword in c]

    return columns

close

close() -> None

Close the database connection.

Source code in alx_heor\database\redshift.py
def close(self) -> None:
    """Close the database connection."""
    if self._conn:
        self._conn.close()
        self._conn = None
        self._cursor = None

__enter__

__enter__() -> RedshiftConnection

Context manager entry.

Source code in alx_heor\database\redshift.py
def __enter__(self) -> RedshiftConnection:
    """Context manager entry."""
    return self

__exit__

__exit__(exc_type, exc_val, exc_tb) -> None

Context manager exit - close connection.

Source code in alx_heor\database\redshift.py
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
    """Context manager exit - close connection."""
    self.close()

get_schemas

get_schemas(keyword: str | None = None) -> list[str]

List schemas in the database.

Parameters:

Name Type Description Default
keyword str

Filter schemas containing this keyword (case-insensitive).

None

Returns:

Type Description
list[str]

List of schema names.

Example

conn.get_schemas('iqvia') ['iqvia_pharmetrics_2024q3', 'iqvia_pharmetrics_mortality_2024q3', ...]

Source code in alx_heor\database\redshift.py
def get_schemas(self, keyword: str | None = None) -> list[str]:
    """List schemas in the database.

    Parameters
    ----------
    keyword : str, optional
        Filter schemas containing this keyword (case-insensitive).

    Returns
    -------
    list[str]
        List of schema names.

    Example
    -------
    >>> conn.get_schemas('iqvia')
    ['iqvia_pharmetrics_2024q3', 'iqvia_pharmetrics_mortality_2024q3', ...]
    """
    if not self._conn or not self._cursor:
        raise RuntimeError("Not connected. Call connect() first.")

    sql = "SELECT nspname AS schema_name FROM pg_namespace"
    if keyword:
        sql += f" WHERE nspname LIKE '%{keyword.lower()}%'"
    sql += " ORDER BY nspname"

    self._cursor.execute(sql)
    return [row[0] for row in self._cursor.fetchall()]

drop_table

drop_table(schema: str, table: str) -> None

Drop a table if it exists.

Parameters:

Name Type Description Default
schema str

Schema name (e.g., 'sbx_username').

required
table str

Table name.

required
Example

conn.drop_table('sbx_kskn306', 'temp_cohort')

Source code in alx_heor\database\redshift.py
def drop_table(self, schema: str, table: str) -> None:
    """Drop a table if it exists.

    Parameters
    ----------
    schema : str
        Schema name (e.g., 'sbx_username').
    table : str
        Table name.

    Example
    -------
    >>> conn.drop_table('sbx_kskn306', 'temp_cohort')
    """
    if not self._conn or not self._cursor:
        raise RuntimeError("Not connected. Call connect() first.")

    sql = f"DROP TABLE IF EXISTS {schema}.{table}"
    self._cursor.execute(sql)
    self._conn.commit()

write_table

write_table(df: DataFrame, schema: str, table: str, if_exists: str = 'replace', chunksize: int = 5000) -> int

Write a DataFrame to a Redshift table.

Uses SQLAlchemy for efficient bulk inserts with chunking.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to write.

required
schema str

Target schema name.

required
table str

Target table name.

required
if_exists str

What to do if table exists: 'replace', 'append', or 'fail'.

'replace'
chunksize int

Number of rows to insert at a time.

5000

Returns:

Type Description
int

Number of rows written.

Example

conn.write_table(df_cohort, 'sbx_kskn306', 'my_cohort') 1234

Source code in alx_heor\database\redshift.py
def write_table(
    self,
    df: pd.DataFrame,
    schema: str,
    table: str,
    if_exists: str = "replace",
    chunksize: int = 5000,
) -> int:
    """Write a DataFrame to a Redshift table.

    Uses SQLAlchemy for efficient bulk inserts with chunking.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame to write.
    schema : str
        Target schema name.
    table : str
        Target table name.
    if_exists : str, default='replace'
        What to do if table exists: 'replace', 'append', or 'fail'.
    chunksize : int, default=5000
        Number of rows to insert at a time.

    Returns
    -------
    int
        Number of rows written.

    Example
    -------
    >>> conn.write_table(df_cohort, 'sbx_kskn306', 'my_cohort')
    1234
    """
    if not self._conn:
        raise RuntimeError("Not connected. Call connect() first.")

    try:
        from sqlalchemy import create_engine
    except ImportError:
        raise ImportError(
            "sqlalchemy is required for write_table. "
            "Install with: pip install sqlalchemy sqlalchemy-redshift psycopg2"
        )

    # Build connection string for SQLAlchemy
    conn_str = (
        f"redshift+psycopg2://{self.user}:{self.password}"
        f"@{self.host}:5439/{self.database}"
    )

    self._conn.rollback()
    engine = create_engine(conn_str)
    df.to_sql(
        table,
        engine,
        index=False,
        if_exists=if_exists,
        schema=schema,
        chunksize=chunksize,
        method="multi",
    )

    return len(df)

write_from_select

write_from_select(sql: str, schema: str, table: str, overwrite: bool = True) -> None

Create a table from a SELECT query.

Parameters:

Name Type Description Default
sql str

SELECT statement (without INTO clause).

required
schema str

Target schema name.

required
table str

Target table name.

required
overwrite bool

If True, drop existing table first.

True
Example

conn.write_from_select( ... "SELECT pat_id, from_dt FROM claims WHERE diag1 = 'G35'", ... 'sbx_kskn306', 'ms_patients' ... )

Source code in alx_heor\database\redshift.py
def write_from_select(
    self,
    sql: str,
    schema: str,
    table: str,
    overwrite: bool = True,
) -> None:
    """Create a table from a SELECT query.

    Parameters
    ----------
    sql : str
        SELECT statement (without INTO clause).
    schema : str
        Target schema name.
    table : str
        Target table name.
    overwrite : bool, default=True
        If True, drop existing table first.

    Example
    -------
    >>> conn.write_from_select(
    ...     "SELECT pat_id, from_dt FROM claims WHERE diag1 = 'G35'",
    ...     'sbx_kskn306', 'ms_patients'
    ... )
    """
    if not self._conn or not self._cursor:
        raise RuntimeError("Not connected. Call connect() first.")

    if overwrite:
        self.drop_table(schema, table)

    # Build SELECT INTO statement
    into_sql = f"SELECT * INTO {schema}.{table} FROM ({sql}) AS subquery"
    self._conn.rollback()
    self._cursor.execute(into_sql)
    self._conn.commit()

format_in_clause

format_in_clause(items: list) -> str

Format a list for use in SQL IN clause.

Parameters:

Name Type Description Default
items list

List of values (strings or numbers).

required

Returns:

Type Description
str

Comma-separated quoted string for SQL IN clause.

Example

format_in_clause(['G35', 'G36', 'G37']) "'G35', 'G36', 'G37'" format_in_clause([1, 2, 3]) "'1', '2', '3'"

Notes

PostgreSQL/Redshift has a limit of ~32,767 items in an IN clause. For larger lists, consider using a temp table join instead.

Source code in alx_heor\database\redshift.py
def format_in_clause(items: list) -> str:
    """Format a list for use in SQL IN clause.

    Parameters
    ----------
    items : list
        List of values (strings or numbers).

    Returns
    -------
    str
        Comma-separated quoted string for SQL IN clause.

    Example
    -------
    >>> format_in_clause(['G35', 'G36', 'G37'])
    "'G35', 'G36', 'G37'"
    >>> format_in_clause([1, 2, 3])
    "'1', '2', '3'"

    Notes
    -----
    PostgreSQL/Redshift has a limit of ~32,767 items in an IN clause.
    For larger lists, consider using a temp table join instead.
    """
    return ", ".join(f"'{item}'" for item in items)