Apache Arrow, parquet and IPC for Golang

Published September 10, 2024

This article is one of many, that I want to write to keep track of snippets and notes about how I use Apache Arrow, Parquet and IPC with Go. My plan is to write at least other two articles one for Python and one for Rust because my data pipeline intersect those languages and I wasted a good amount of time figuring out how to read, write to from IPC, convert schema from Arrow to Parquet and so on.

In Rust the documentation is ok, but with Go I usually end up into articles using third party libraries that wrap the native one and since I always figured out how do to what I want without any of those I just to keep the somewhere I can look at without searching around.

I think this can be useful to other people as well so here I am. I don’t think those are the most efficient way, or the right way to do what I am doing, but it works for me so far.

From Arrow to various formats

This is now I convert Arrow to CSV, IPC or Parquet.

reader := array.NewTableReader(table, -1)
defer reader.Release()

switch contentType {
case Csv:
    result := bytes.NewBuffer([]byte{})
    writer := csv.NewWriter(result, table.Schema())
    for reader.Next() {
        err := writer.Write(reader.Record())
        if err != nil {
            return nil, err
        }
    }
    return result.Bytes(), nil
case Parquet:
    result := bytes.NewBuffer([]byte{})

    props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false))
    arrProps := pqarrow.DefaultWriterProps()

    writer, err := pqarrow.NewFileWriter(table.Schema(), result, props, arrProps)
    if err != nil {
        return nil, err
    }
    if err := writer.WriteTable(table, 1024*1024); err != nil {
        return nil, err
    }
    writer.Close()
    return result.Bytes(), nil
case ArrowIpc:
    result := bytes.NewBuffer([]byte{})
    writer := ipc.NewWriter(result, ipc.WithSchema(table.Schema()))
    defer writer.Close()
    for reader.Next() {
        err := writer.Write(reader.Record())
        if err != nil {
            return nil, err
        }
    }
    return result.Bytes(), nil
}

return nil, errors.New("specified content not supported")

From Parquet, IPC to Arrow

records := []arrow.Record{}
switch contentType {
case ArrowIpc:
    reader, err := ipc.NewReader(inputReader)
    if err != nil {
        return nil, err
    }
    for reader.Next() {
        rec := reader.Record()
        rec.Retain()
        defer rec.Release()
        records = append(records, rec)
    }
    if err := reader.Err(); err != nil {
        return nil, err
    }
    return array.NewTableFromRecords(reader.Schema(), records), nil
case Parquet:
    var err error
    reader, err := file.NewParquetReader(bytes.NewReader(inputReader.Bytes()))
    if err != nil {
        return nil, err
    }
    arrow_reader, err := pqarrow.NewFileReader(reader, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
    if err != nil {
        return nil, err
    }

    schema, err := arrow_reader.Schema()
    if err != nil {
        return nil, err
    }

    record_reader, err := arrow_reader.GetRecordReader(ctx, nil, nil)
    if err != nil {
        return nil, err
    }

    for record_reader.Next() {
        rec := record_reader.Record()
        rec.Retain()
        defer rec.Release()
        records = append(records, rec)
    }

    return array.NewTableFromRecords(schema, records), nil
default:
    return nil, errors.New("type not supported")
}

From CSV to Arrow

I kept it separate from the previous section because CSV does not really have a schema, for this reason there are two CSV readers, the one you see here infer types and schema looking at the CSV columns and their content. The second reader requires a schema to be pass and if the CSV does not follow such schema the reader fails.

    reader := csv.NewInferringReader(
        contentFromRedis,
        csv.WithChunk(0),
        csv.WithHeader(true),
        csv.WithNullReader(true, []string{"", "NULL", "null", "N/A"}...))
    defer reader.Release()
    for reader.Next() {
        rec := reader.Record()
        rec.Retain()
        defer rec.Release()
        records = append(records, rec)
    }
    if err := reader.Err(); err != nil {
        return nil, err
    }
    return array.NewTableFromRecords(reader.Schema(), records), nil]

Are you having trouble figuring out your way to building automation, release and troubleshoot your software? Let's get actionables lessons learned straight to you via email.